Madan Jampani

Support a inmemory p0 partition encompassing all nodes in the cluster. This will…

… be used by leadership manager and other usecases
that need strong consistency for coordination and not durable storage

Change-Id: I8e590e46d82a3d43cae3157a04be820bb7e1b175
Showing 19 changed files with 422 additions and 311 deletions
...@@ -42,7 +42,9 @@ public class PartitionsListCommand extends AbstractShellCommand { ...@@ -42,7 +42,9 @@ public class PartitionsListCommand extends AbstractShellCommand {
42 * @param partitionInfo partition descriptions 42 * @param partitionInfo partition descriptions
43 */ 43 */
44 private void displayPartitions(List<PartitionInfo> partitionInfo) { 44 private void displayPartitions(List<PartitionInfo> partitionInfo) {
45 + print("----------------------------------------------------------");
45 print(FMT, "Name", "Term", "Members", ""); 46 print(FMT, "Name", "Term", "Members", "");
47 + print("----------------------------------------------------------");
46 48
47 for (PartitionInfo info : partitionInfo) { 49 for (PartitionInfo info : partitionInfo) {
48 boolean first = true; 50 boolean first = true;
...@@ -56,6 +58,9 @@ public class PartitionsListCommand extends AbstractShellCommand { ...@@ -56,6 +58,9 @@ public class PartitionsListCommand extends AbstractShellCommand {
56 member.equals(info.leader()) ? "*" : ""); 58 member.equals(info.leader()) ? "*" : "");
57 } 59 }
58 } 60 }
61 + if (!first) {
62 + print("----------------------------------------------------------");
63 + }
59 } 64 }
60 } 65 }
61 66
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 -import java.util.Set;
21 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 +import java.util.Set;
22 22
23 /** 23 /**
24 * A distributed, strongly consistent map. 24 * A distributed, strongly consistent map.
......
1 +package org.onosproject.store.service;
2 +
3 +
4 +/**
5 + * Builder for consistent maps.
6 + *
7 + * @param <K> type for map key
8 + * @param <V> type for map value
9 + */
10 +public interface ConsistentMapBuilder<K, V> {
11 +
12 + /**
13 + * Sets the name of the map.
14 + * <p>
15 + * Each consistent map is identified by a unique map name.
16 + * </p>
17 + * <p>
18 + * Note: This is a mandatory parameter.
19 + * </p>
20 + *
21 + * @param name name of the consistent map
22 + * @return this ConsistentMapBuilder
23 + */
24 + public ConsistentMapBuilder<K, V> withName(String name);
25 +
26 + /**
27 + * Sets a serializer that can be used to serialize
28 + * both the keys and values inserted into the map. The serializer
29 + * builder should be pre-populated with any classes that will be
30 + * put into the map.
31 + * <p>
32 + * Note: This is a mandatory parameter.
33 + * </p>
34 + *
35 + * @param serializer serializer
36 + * @return this ConsistentMapBuilder
37 + */
38 + public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer);
39 +
40 + /**
41 + * Disables distribution of map entries across multiple database partitions.
42 + * <p>
43 + * When partitioning is disabled, the returned map will have a single partition
44 + * that spans the entire cluster. Furthermore, the changes made to the map are
45 + * ephemeral and do not survive a full cluster restart.
46 + * </p>
47 + * <p>
48 + * Disabling partitions is more appropriate when the returned map is used for
49 + * coordination activities such as leader election and not for long term data persistence.
50 + * </p>
51 + * <p>
52 + * Note: By default partitions are enabled and entries in the map are durable.
53 + * </p>
54 + * @return this ConsistentMapBuilder
55 + */
56 + public ConsistentMapBuilder<K, V> withPartitionsDisabled();
57 +
58 + /**
59 + * Builds an consistent map based on the configuration options
60 + * supplied to this builder.
61 + *
62 + * @return new consistent map
63 + * @throws java.lang.RuntimeException if a mandatory parameter is missing
64 + */
65 + public ConsistentMap<K, V> build();
66 +
67 + /**
68 + * Builds an async consistent map based on the configuration options
69 + * supplied to this builder.
70 + *
71 + * @return new async consistent map
72 + * @throws java.lang.RuntimeException if a mandatory parameter is missing
73 + */
74 + public AsyncConsistentMap<K, V> buildAsyncMap();
75 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -29,33 +29,20 @@ package org.onosproject.store.service; ...@@ -29,33 +29,20 @@ package org.onosproject.store.service;
29 public interface StorageService { 29 public interface StorageService {
30 30
31 /** 31 /**
32 - * Creates a ConsistentMap. 32 + * Creates a new transaction context.
33 * 33 *
34 - * @param name map name 34 + * @return transaction context
35 - * @param serializer serializer to use for serializing keys and values
36 - * @return consistent map.
37 - * @param <K> key type
38 - * @param <V> value type
39 */ 35 */
40 - <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer); 36 + TransactionContext createTransactionContext();
41 37
42 /** 38 /**
43 - * Creates a AsyncConsistentMap. 39 + * Creates a new EventuallyConsistentMapBuilder.
44 * 40 *
45 - * @param name map name
46 - * @param serializer serializer to use for serializing keys and values
47 - * @return async consistent map
48 * @param <K> key type 41 * @param <K> key type
49 * @param <V> value type 42 * @param <V> value type
43 + * @return builder for an eventually consistent map
50 */ 44 */
51 - <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer); 45 + <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
52 -
53 - /**
54 - * Creates a new transaction context.
55 - *
56 - * @return transaction context
57 - */
58 - TransactionContext createTransactionContext();
59 46
60 /** 47 /**
61 * Creates a new EventuallyConsistentMapBuilder. 48 * Creates a new EventuallyConsistentMapBuilder.
...@@ -64,6 +51,5 @@ public interface StorageService { ...@@ -64,6 +51,5 @@ public interface StorageService {
64 * @param <V> value type 51 * @param <V> value type
65 * @return builder for an eventually consistent map 52 * @return builder for an eventually consistent map
66 */ 53 */
67 - <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder(); 54 + <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
68 -
69 } 55 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -81,5 +81,4 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa ...@@ -81,5 +81,4 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa
81 .addStartupTask(() -> coordinator.open().thenApply(v -> null)) 81 .addStartupTask(() -> coordinator.open().thenApply(v -> null))
82 .addShutdownTask(coordinator::close); 82 .addShutdownTask(coordinator::close);
83 } 83 }
84 -
85 } 84 }
......
...@@ -36,6 +36,8 @@ public class DatabaseConfig extends ResourceConfig<DatabaseConfig> { ...@@ -36,6 +36,8 @@ public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
36 private static final String DEFAULT_CONFIGURATION = "database-defaults"; 36 private static final String DEFAULT_CONFIGURATION = "database-defaults";
37 private static final String CONFIGURATION = "database"; 37 private static final String CONFIGURATION = "database";
38 38
39 + private String name;
40 +
39 public DatabaseConfig() { 41 public DatabaseConfig() {
40 super(CONFIGURATION, DEFAULT_CONFIGURATION); 42 super(CONFIGURATION, DEFAULT_CONFIGURATION);
41 } 43 }
...@@ -114,6 +116,37 @@ public class DatabaseConfig extends ResourceConfig<DatabaseConfig> { ...@@ -114,6 +116,37 @@ public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
114 return this; 116 return this;
115 } 117 }
116 118
119 + /**
120 + * Returns the database name.
121 + *
122 + * @return The database name
123 + */
124 + public String getName() {
125 + return name;
126 + }
127 +
128 + /**
129 + * Sets the database name, returning the configuration for method chaining.
130 + *
131 + * @param name The database name
132 + * @return The database configuration
133 + * @throws java.lang.NullPointerException If the name is {@code null}
134 + */
135 + public DatabaseConfig withName(String name) {
136 + setName(Assert.isNotNull(name, "name"));
137 + return this;
138 + }
139 +
140 + /**
141 + * Sets the database name.
142 + *
143 + * @param name The database name
144 + * @throws java.lang.NullPointerException If the name is {@code null}
145 + */
146 + public void setName(String name) {
147 + this.name = Assert.isNotNull(name, "name");
148 + }
149 +
117 @Override 150 @Override
118 public CoordinatedResourceConfig resolve(ClusterConfig cluster) { 151 public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
119 return new StateLogConfig(toMap()) 152 return new StateLogConfig(toMap())
......
...@@ -16,12 +16,23 @@ ...@@ -16,12 +16,23 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 +import com.google.common.collect.Lists;
19 import com.google.common.collect.Sets; 20 import com.google.common.collect.Sets;
21 +
22 +import net.kuujo.copycat.CopycatConfig;
20 import net.kuujo.copycat.cluster.ClusterConfig; 23 import net.kuujo.copycat.cluster.ClusterConfig;
21 import net.kuujo.copycat.cluster.Member; 24 import net.kuujo.copycat.cluster.Member;
25 +import net.kuujo.copycat.cluster.Member.Type;
26 +import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
27 +import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
28 +import net.kuujo.copycat.log.BufferedLog;
22 import net.kuujo.copycat.log.FileLog; 29 import net.kuujo.copycat.log.FileLog;
30 +import net.kuujo.copycat.log.Log;
23 import net.kuujo.copycat.netty.NettyTcpProtocol; 31 import net.kuujo.copycat.netty.NettyTcpProtocol;
24 import net.kuujo.copycat.protocol.Consistency; 32 import net.kuujo.copycat.protocol.Consistency;
33 +import net.kuujo.copycat.protocol.Protocol;
34 +import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
35 +
25 import org.apache.felix.scr.annotations.Activate; 36 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component; 37 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate; 38 import org.apache.felix.scr.annotations.Deactivate;
...@@ -32,11 +43,9 @@ import org.onosproject.cluster.ClusterService; ...@@ -32,11 +43,9 @@ import org.onosproject.cluster.ClusterService;
32 import org.onosproject.store.cluster.impl.NodeInfo; 43 import org.onosproject.store.cluster.impl.NodeInfo;
33 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 44 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
34 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; 45 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
35 -import org.onosproject.store.service.AsyncConsistentMap; 46 +import org.onosproject.store.service.ConsistentMapBuilder;
36 -import org.onosproject.store.service.ConsistentMap;
37 import org.onosproject.store.service.EventuallyConsistentMapBuilder; 47 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
38 import org.onosproject.store.service.PartitionInfo; 48 import org.onosproject.store.service.PartitionInfo;
39 -import org.onosproject.store.service.Serializer;
40 import org.onosproject.store.service.StorageAdminService; 49 import org.onosproject.store.service.StorageAdminService;
41 import org.onosproject.store.service.StorageService; 50 import org.onosproject.store.service.StorageService;
42 import org.onosproject.store.service.TransactionContext; 51 import org.onosproject.store.service.TransactionContext;
...@@ -47,7 +56,9 @@ import java.io.IOException; ...@@ -47,7 +56,9 @@ import java.io.IOException;
47 import java.util.List; 56 import java.util.List;
48 import java.util.Map; 57 import java.util.Map;
49 import java.util.Set; 58 import java.util.Set;
59 +import java.util.concurrent.CompletableFuture;
50 import java.util.concurrent.CountDownLatch; 60 import java.util.concurrent.CountDownLatch;
61 +import java.util.concurrent.Executors;
51 import java.util.concurrent.TimeUnit; 62 import java.util.concurrent.TimeUnit;
52 import java.util.stream.Collectors; 63 import java.util.stream.Collectors;
53 64
...@@ -61,13 +72,16 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -61,13 +72,16 @@ import static org.slf4j.LoggerFactory.getLogger;
61 public class DatabaseManager implements StorageService, StorageAdminService { 72 public class DatabaseManager implements StorageService, StorageAdminService {
62 73
63 private final Logger log = getLogger(getClass()); 74 private final Logger log = getLogger(getClass());
75 + private ClusterCoordinator coordinator;
64 private PartitionedDatabase partitionedDatabase; 76 private PartitionedDatabase partitionedDatabase;
77 + private Database inMemoryDatabase;
65 public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT 78 public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
66 private static final String CONFIG_DIR = "../config"; 79 private static final String CONFIG_DIR = "../config";
67 private static final String PARTITION_DEFINITION_FILE = "tablets.json"; 80 private static final String PARTITION_DEFINITION_FILE = "tablets.json";
68 private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60; 81 private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
69 - 82 + public static final String BASE_PARTITION_NAME = "p0";
70 - private final PartitionedDatabaseConfig databaseConfig = new PartitionedDatabaseConfig(); 83 + private static final int RAFT_ELECTION_TIMEOUT = 3000;
84 + private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
71 85
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected ClusterService clusterService; 87 protected ClusterService clusterService;
...@@ -82,8 +96,6 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -82,8 +96,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
82 @Activate 96 @Activate
83 public void activate() { 97 public void activate() {
84 98
85 - final String logDir = System.getProperty("karaf.data", "./data");
86 -
87 // load database configuration 99 // load database configuration
88 File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE); 100 File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
89 log.info("Loading database definition: {}", file.getAbsolutePath()); 101 log.info("Loading database definition: {}", file.getAbsolutePath());
...@@ -107,47 +119,56 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -107,47 +119,56 @@ public class DatabaseManager implements StorageService, StorageAdminService {
107 String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode())); 119 String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
108 120
109 ClusterConfig clusterConfig = new ClusterConfig() 121 ClusterConfig clusterConfig = new ClusterConfig()
110 - .withProtocol(new NettyTcpProtocol() 122 + .withProtocol(newNettyProtocol())
111 - .withSsl(false) 123 + .withElectionTimeout(RAFT_ELECTION_TIMEOUT)
112 - .withConnectTimeout(60000) 124 + .withHeartbeatInterval(RAFT_HEARTBEAT_TIMEOUT)
113 - .withAcceptBacklog(1024)
114 - .withTrafficClass(-1)
115 - .withSoLinger(-1)
116 - .withReceiveBufferSize(32768)
117 - .withSendBufferSize(8192)
118 - .withThreads(1))
119 - .withElectionTimeout(3000)
120 - .withHeartbeatInterval(1500)
121 .withMembers(activeNodeUris) 125 .withMembers(activeNodeUris)
122 .withLocalMember(localNodeUri); 126 .withLocalMember(localNodeUri);
123 127
124 - partitionMap.forEach((name, nodes) -> { 128 + CopycatConfig copycatConfig = new CopycatConfig()
125 - Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet()); 129 + .withName("onos")
126 - DatabaseConfig partitionConfig = new DatabaseConfig() 130 + .withClusterConfig(clusterConfig)
127 - .withElectionTimeout(3000)
128 - .withHeartbeatInterval(1500)
129 - .withConsistency(Consistency.STRONG)
130 - .withLog(new FileLog()
131 - .withDirectory(logDir)
132 - .withSegmentSize(1073741824) // 1GB
133 - .withFlushOnWrite(true)
134 - .withSegmentInterval(Long.MAX_VALUE))
135 .withDefaultSerializer(new DatabaseSerializer()) 131 .withDefaultSerializer(new DatabaseSerializer())
136 - .withReplicas(replicas); 132 + .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
137 - databaseConfig.addPartition(name, partitionConfig); 133 +
138 - }); 134 + coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
135 +
136 + DatabaseConfig inMemoryDatabaseConfig =
137 + newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris);
138 + inMemoryDatabase = coordinator
139 + .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig)
140 + .withSerializer(copycatConfig.getDefaultSerializer())
141 + .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
142 +
143 + List<Database> partitions = partitionMap.entrySet()
144 + .stream()
145 + .map(entry -> {
146 + String[] replicas = entry.getValue().stream().map(this::nodeToUri).toArray(String[]::new);
147 + return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
148 + })
149 + .map(config -> {
150 + Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig)
151 + .withSerializer(copycatConfig.getDefaultSerializer())
152 + .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
153 + return db;
154 + })
155 + .collect(Collectors.toList());
139 156
140 - partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig); 157 + partitionedDatabase = new PartitionedDatabase("onos-store", partitions);
141 158
142 CountDownLatch latch = new CountDownLatch(1); 159 CountDownLatch latch = new CountDownLatch(1);
143 - partitionedDatabase.open().whenComplete((db, error) -> { 160 +
161 + coordinator.open()
162 + .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
163 + .whenComplete((db, error) -> {
144 if (error != null) { 164 if (error != null) {
145 - log.warn("Failed to open database.", error); 165 + log.warn("Failed to create databases.", error);
146 } else { 166 } else {
147 latch.countDown(); 167 latch.countDown();
148 - log.info("Successfully opened database."); 168 + log.info("Successfully created databases.");
149 } 169 }
150 - }); 170 + }));
171 +
151 try { 172 try {
152 if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) { 173 if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
153 log.warn("Timed out waiting for database to initialize."); 174 log.warn("Timed out waiting for database to initialize.");
...@@ -161,52 +182,87 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -161,52 +182,87 @@ public class DatabaseManager implements StorageService, StorageAdminService {
161 182
162 @Deactivate 183 @Deactivate
163 public void deactivate() { 184 public void deactivate() {
164 - partitionedDatabase.close().whenComplete((result, error) -> { 185 + CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
186 + .thenCompose(v -> coordinator.close())
187 + .whenComplete((result, error) -> {
165 if (error != null) { 188 if (error != null) {
166 - log.warn("Failed to cleanly close database.", error); 189 + log.warn("Failed to cleanly close databases.", error);
167 } else { 190 } else {
168 - log.info("Successfully closed database."); 191 + log.info("Successfully closed databases.");
169 } 192 }
170 }); 193 });
171 log.info("Stopped"); 194 log.info("Stopped");
172 } 195 }
173 196
174 @Override 197 @Override
175 - public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
176 - return new DefaultConsistentMap<>(name, partitionedDatabase, serializer);
177 - }
178 -
179 - @Override
180 - public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
181 - return new DefaultAsyncConsistentMap<>(name, partitionedDatabase, serializer);
182 - }
183 -
184 - @Override
185 public TransactionContext createTransactionContext() { 198 public TransactionContext createTransactionContext() {
186 return new DefaultTransactionContext(partitionedDatabase); 199 return new DefaultTransactionContext(partitionedDatabase);
187 } 200 }
188 201
189 @Override 202 @Override
190 public List<PartitionInfo> getPartitionInfo() { 203 public List<PartitionInfo> getPartitionInfo() {
191 - return partitionedDatabase.getRegisteredPartitions() 204 + return Lists.asList(
192 - .values() 205 + inMemoryDatabase,
206 + partitionedDatabase.getPartitions().toArray(new Database[]{}))
193 .stream() 207 .stream()
194 - .map(db -> toPartitionInfo(db, databaseConfig.partitions().get(db.name()))) 208 + .map(DatabaseManager::toPartitionInfo)
195 .collect(Collectors.toList()); 209 .collect(Collectors.toList());
196 } 210 }
197 211
212 + private Protocol newNettyProtocol() {
213 + return new NettyTcpProtocol()
214 + .withSsl(false)
215 + .withConnectTimeout(60000)
216 + .withAcceptBacklog(1024)
217 + .withTrafficClass(-1)
218 + .withSoLinger(-1)
219 + .withReceiveBufferSize(32768)
220 + .withSendBufferSize(8192)
221 + .withThreads(1);
222 + }
223 +
224 + private Log newPersistentLog() {
225 + String logDir = System.getProperty("karaf.data", "./data");
226 + return new FileLog()
227 + .withDirectory(logDir)
228 + .withSegmentSize(1073741824) // 1GB
229 + .withFlushOnWrite(true)
230 + .withSegmentInterval(Long.MAX_VALUE);
231 + }
232 +
233 + private Log newInMemoryLog() {
234 + return new BufferedLog()
235 + .withFlushOnWrite(false)
236 + .withFlushInterval(Long.MAX_VALUE)
237 + .withSegmentSize(10485760) // 10MB
238 + .withSegmentInterval(Long.MAX_VALUE);
239 + }
240 +
241 + private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) {
242 + return new DatabaseConfig()
243 + .withName(name)
244 + .withElectionTimeout(RAFT_ELECTION_TIMEOUT)
245 + .withHeartbeatInterval(RAFT_HEARTBEAT_TIMEOUT)
246 + .withConsistency(Consistency.STRONG)
247 + .withLog(log)
248 + .withDefaultSerializer(new DatabaseSerializer())
249 + .withReplicas(replicas);
250 + }
251 +
198 /** 252 /**
199 * Maps a Raft Database object to a PartitionInfo object. 253 * Maps a Raft Database object to a PartitionInfo object.
200 * 254 *
201 * @param database database containing input data 255 * @param database database containing input data
202 * @return PartitionInfo object 256 * @return PartitionInfo object
203 */ 257 */
204 - private static PartitionInfo toPartitionInfo(Database database, DatabaseConfig dbConfig) { 258 + private static PartitionInfo toPartitionInfo(Database database) {
205 return new PartitionInfo(database.name(), 259 return new PartitionInfo(database.name(),
206 database.cluster().term(), 260 database.cluster().term(),
207 - database.cluster().members().stream() 261 + database.cluster().members()
262 + .stream()
263 + .filter(member -> Type.ACTIVE.equals(member.type()))
208 .map(Member::uri) 264 .map(Member::uri)
209 - .filter(uri -> dbConfig.getReplicas().contains(uri)) 265 + .sorted()
210 .collect(Collectors.toList()), 266 .collect(Collectors.toList()),
211 database.cluster().leader() != null ? 267 database.cluster().leader() != null ?
212 database.cluster().leader().uri() : null); 268 database.cluster().leader().uri() : null);
...@@ -219,4 +275,8 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -219,4 +275,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
219 clusterCommunicator); 275 clusterCommunicator);
220 } 276 }
221 277
278 + @Override
279 + public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
280 + return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
281 + }
222 } 282 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -18,10 +18,9 @@ package org.onosproject.store.consistent.impl; ...@@ -18,10 +18,9 @@ package org.onosproject.store.consistent.impl;
18 18
19 import static com.google.common.base.Preconditions.checkState; 19 import static com.google.common.base.Preconditions.checkState;
20 20
21 -import java.util.Map; 21 +import java.util.List;
22 -
23 import com.google.common.base.Charsets; 22 import com.google.common.base.Charsets;
24 -import com.google.common.collect.ImmutableSortedMap; 23 +import com.google.common.collect.ImmutableList;
25 import com.google.common.hash.Hashing; 24 import com.google.common.hash.Hashing;
26 25
27 /** 26 /**
...@@ -32,11 +31,11 @@ import com.google.common.hash.Hashing; ...@@ -32,11 +31,11 @@ import com.google.common.hash.Hashing;
32 */ 31 */
33 public abstract class DatabasePartitioner implements Partitioner<String> { 32 public abstract class DatabasePartitioner implements Partitioner<String> {
34 // Database partitions sorted by their partition name. 33 // Database partitions sorted by their partition name.
35 - protected final Database[] sortedPartitions; 34 + protected final List<Database> partitions;
36 35
37 - public DatabasePartitioner(Map<String, Database> partitionMap) { 36 + public DatabasePartitioner(List<Database> partitions) {
38 - checkState(partitionMap != null && !partitionMap.isEmpty(), "Partition map cannot be null or empty"); 37 + checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty");
39 - sortedPartitions = ImmutableSortedMap.<String, Database>copyOf(partitionMap).values().toArray(new Database[]{}); 38 + this.partitions = ImmutableList.copyOf(partitions);
40 } 39 }
41 40
42 protected int hash(String key) { 41 protected int hash(String key) {
......
...@@ -45,7 +45,7 @@ import com.google.common.cache.LoadingCache; ...@@ -45,7 +45,7 @@ import com.google.common.cache.LoadingCache;
45 public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> { 45 public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
46 46
47 private final String name; 47 private final String name;
48 - private final DatabaseProxy<String, byte[]> proxy; 48 + private final Database database;
49 private final Serializer serializer; 49 private final Serializer serializer;
50 50
51 private static final String ERROR_NULL_KEY = "Key cannot be null"; 51 private static final String ERROR_NULL_KEY = "Key cannot be null";
...@@ -66,39 +66,39 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -66,39 +66,39 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
66 } 66 }
67 67
68 public DefaultAsyncConsistentMap(String name, 68 public DefaultAsyncConsistentMap(String name,
69 - DatabaseProxy<String, byte[]> proxy, 69 + Database database,
70 Serializer serializer) { 70 Serializer serializer) {
71 this.name = checkNotNull(name, "map name cannot be null"); 71 this.name = checkNotNull(name, "map name cannot be null");
72 - this.proxy = checkNotNull(proxy, "database proxy cannot be null"); 72 + this.database = checkNotNull(database, "database cannot be null");
73 this.serializer = checkNotNull(serializer, "serializer cannot be null"); 73 this.serializer = checkNotNull(serializer, "serializer cannot be null");
74 } 74 }
75 75
76 @Override 76 @Override
77 public CompletableFuture<Integer> size() { 77 public CompletableFuture<Integer> size() {
78 - return proxy.size(name); 78 + return database.size(name);
79 } 79 }
80 80
81 @Override 81 @Override
82 public CompletableFuture<Boolean> isEmpty() { 82 public CompletableFuture<Boolean> isEmpty() {
83 - return proxy.isEmpty(name); 83 + return database.isEmpty(name);
84 } 84 }
85 85
86 @Override 86 @Override
87 public CompletableFuture<Boolean> containsKey(K key) { 87 public CompletableFuture<Boolean> containsKey(K key) {
88 checkNotNull(key, ERROR_NULL_KEY); 88 checkNotNull(key, ERROR_NULL_KEY);
89 - return proxy.containsKey(name, keyCache.getUnchecked(key)); 89 + return database.containsKey(name, keyCache.getUnchecked(key));
90 } 90 }
91 91
92 @Override 92 @Override
93 public CompletableFuture<Boolean> containsValue(V value) { 93 public CompletableFuture<Boolean> containsValue(V value) {
94 checkNotNull(value, ERROR_NULL_VALUE); 94 checkNotNull(value, ERROR_NULL_VALUE);
95 - return proxy.containsValue(name, serializer.encode(value)); 95 + return database.containsValue(name, serializer.encode(value));
96 } 96 }
97 97
98 @Override 98 @Override
99 public CompletableFuture<Versioned<V>> get(K key) { 99 public CompletableFuture<Versioned<V>> get(K key) {
100 checkNotNull(key, ERROR_NULL_KEY); 100 checkNotNull(key, ERROR_NULL_KEY);
101 - return proxy.get(name, keyCache.getUnchecked(key)) 101 + return database.get(name, keyCache.getUnchecked(key))
102 .thenApply(v -> v != null 102 .thenApply(v -> v != null
103 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); 103 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
104 } 104 }
...@@ -107,7 +107,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -107,7 +107,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
107 public CompletableFuture<Versioned<V>> put(K key, V value) { 107 public CompletableFuture<Versioned<V>> put(K key, V value) {
108 checkNotNull(key, ERROR_NULL_KEY); 108 checkNotNull(key, ERROR_NULL_KEY);
109 checkNotNull(value, ERROR_NULL_VALUE); 109 checkNotNull(value, ERROR_NULL_VALUE);
110 - return proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)) 110 + return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
111 .thenApply(v -> v != null 111 .thenApply(v -> v != null
112 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); 112 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
113 } 113 }
...@@ -115,19 +115,19 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -115,19 +115,19 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
115 @Override 115 @Override
116 public CompletableFuture<Versioned<V>> remove(K key) { 116 public CompletableFuture<Versioned<V>> remove(K key) {
117 checkNotNull(key, ERROR_NULL_KEY); 117 checkNotNull(key, ERROR_NULL_KEY);
118 - return proxy.remove(name, keyCache.getUnchecked(key)) 118 + return database.remove(name, keyCache.getUnchecked(key))
119 .thenApply(v -> v != null 119 .thenApply(v -> v != null
120 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); 120 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
121 } 121 }
122 122
123 @Override 123 @Override
124 public CompletableFuture<Void> clear() { 124 public CompletableFuture<Void> clear() {
125 - return proxy.clear(name); 125 + return database.clear(name);
126 } 126 }
127 127
128 @Override 128 @Override
129 public CompletableFuture<Set<K>> keySet() { 129 public CompletableFuture<Set<K>> keySet() {
130 - return proxy.keySet(name) 130 + return database.keySet(name)
131 .thenApply(s -> s 131 .thenApply(s -> s
132 .stream() 132 .stream()
133 .map(this::dK) 133 .map(this::dK)
...@@ -136,7 +136,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -136,7 +136,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
136 136
137 @Override 137 @Override
138 public CompletableFuture<Collection<Versioned<V>>> values() { 138 public CompletableFuture<Collection<Versioned<V>>> values() {
139 - return proxy.values(name).thenApply(c -> c 139 + return database.values(name).thenApply(c -> c
140 .stream() 140 .stream()
141 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime())) 141 .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
142 .collect(Collectors.toList())); 142 .collect(Collectors.toList()));
...@@ -144,7 +144,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -144,7 +144,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
144 144
145 @Override 145 @Override
146 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() { 146 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
147 - return proxy.entrySet(name).thenApply(s -> s 147 + return database.entrySet(name).thenApply(s -> s
148 .stream() 148 .stream()
149 .map(this::fromRawEntry) 149 .map(this::fromRawEntry)
150 .collect(Collectors.toSet())); 150 .collect(Collectors.toSet()));
...@@ -154,7 +154,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -154,7 +154,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
154 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) { 154 public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
155 checkNotNull(key, ERROR_NULL_KEY); 155 checkNotNull(key, ERROR_NULL_KEY);
156 checkNotNull(value, ERROR_NULL_VALUE); 156 checkNotNull(value, ERROR_NULL_VALUE);
157 - return proxy.putIfAbsent( 157 + return database.putIfAbsent(
158 name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v -> 158 name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
159 v != null ? 159 v != null ?
160 new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); 160 new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
...@@ -164,13 +164,13 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -164,13 +164,13 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
164 public CompletableFuture<Boolean> remove(K key, V value) { 164 public CompletableFuture<Boolean> remove(K key, V value) {
165 checkNotNull(key, ERROR_NULL_KEY); 165 checkNotNull(key, ERROR_NULL_KEY);
166 checkNotNull(value, ERROR_NULL_VALUE); 166 checkNotNull(value, ERROR_NULL_VALUE);
167 - return proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)); 167 + return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
168 } 168 }
169 169
170 @Override 170 @Override
171 public CompletableFuture<Boolean> remove(K key, long version) { 171 public CompletableFuture<Boolean> remove(K key, long version) {
172 checkNotNull(key, ERROR_NULL_KEY); 172 checkNotNull(key, ERROR_NULL_KEY);
173 - return proxy.remove(name, keyCache.getUnchecked(key), version); 173 + return database.remove(name, keyCache.getUnchecked(key), version);
174 174
175 } 175 }
176 176
...@@ -179,14 +179,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -179,14 +179,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
179 checkNotNull(key, ERROR_NULL_KEY); 179 checkNotNull(key, ERROR_NULL_KEY);
180 checkNotNull(newValue, ERROR_NULL_VALUE); 180 checkNotNull(newValue, ERROR_NULL_VALUE);
181 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null; 181 byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
182 - return proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)); 182 + return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
183 } 183 }
184 184
185 @Override 185 @Override
186 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) { 186 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
187 checkNotNull(key, ERROR_NULL_KEY); 187 checkNotNull(key, ERROR_NULL_KEY);
188 checkNotNull(newValue, ERROR_NULL_VALUE); 188 checkNotNull(newValue, ERROR_NULL_VALUE);
189 - return proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)); 189 + return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
190 } 190 }
191 191
192 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) { 192 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
......
...@@ -44,9 +44,9 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -44,9 +44,9 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
44 private final AsyncConsistentMap<K, V> asyncMap; 44 private final AsyncConsistentMap<K, V> asyncMap;
45 45
46 public DefaultConsistentMap(String name, 46 public DefaultConsistentMap(String name,
47 - DatabaseProxy<String, byte[]> proxy, 47 + Database database,
48 Serializer serializer) { 48 Serializer serializer) {
49 - asyncMap = new DefaultAsyncConsistentMap<>(name, proxy, serializer); 49 + asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer);
50 } 50 }
51 51
52 @Override 52 @Override
......
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkArgument;
4 +import static com.google.common.base.Preconditions.checkState;
5 +
6 +import org.onosproject.store.service.AsyncConsistentMap;
7 +import org.onosproject.store.service.ConsistentMap;
8 +import org.onosproject.store.service.ConsistentMapBuilder;
9 +import org.onosproject.store.service.Serializer;
10 +
11 +/**
12 + * Default Consistent Map builder.
13 + *
14 + * @param <K> type for map key
15 + * @param <V> type for map value
16 + */
17 +public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K, V> {
18 +
19 + private Serializer serializer;
20 + private String name;
21 + private boolean partitionsEnabled = true;
22 + private final Database partitionedDatabase;
23 + private final Database inMemoryDatabase;
24 +
25 + public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
26 + this.inMemoryDatabase = inMemoryDatabase;
27 + this.partitionedDatabase = partitionedDatabase;
28 + }
29 +
30 + @Override
31 + public ConsistentMapBuilder<K, V> withName(String name) {
32 + checkArgument(name != null && !name.isEmpty());
33 + this.name = name;
34 + return this;
35 + }
36 +
37 + @Override
38 + public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
39 + checkArgument(serializer != null);
40 + this.serializer = serializer;
41 + return this;
42 + }
43 +
44 + @Override
45 + public ConsistentMapBuilder<K, V> withPartitionsDisabled() {
46 + partitionsEnabled = false;
47 + return this;
48 + }
49 +
50 + private boolean validInputs() {
51 + return name != null && serializer != null;
52 + }
53 +
54 + @Override
55 + public ConsistentMap<K, V> build() {
56 + checkState(validInputs());
57 + return new DefaultConsistentMap<>(
58 + name,
59 + partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
60 + serializer);
61 + }
62 +
63 + @Override
64 + public AsyncConsistentMap<K, V> buildAsyncMap() {
65 + checkState(validInputs());
66 + return new DefaultAsyncConsistentMap<>(
67 + name,
68 + partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
69 + serializer);
70 + }
71 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -162,4 +162,17 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -162,4 +162,17 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
162 return stateMachine.close() 162 return stateMachine.close()
163 .thenCompose(v -> runShutdownTasks()); 163 .thenCompose(v -> runShutdownTasks());
164 } 164 }
165 +
166 + @Override
167 + public int hashCode() {
168 + return name().hashCode();
169 + }
170 +
171 + @Override
172 + public boolean equals(Object other) {
173 + if (other instanceof Database) {
174 + return name().equals(((Database) other).name());
175 + }
176 + return false;
177 + }
165 } 178 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -42,12 +42,12 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -42,12 +42,12 @@ public class DefaultTransactionContext implements TransactionContext {
42 42
43 private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap(); 43 private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
44 private boolean isOpen = false; 44 private boolean isOpen = false;
45 - DatabaseProxy<String, byte[]> databaseProxy; 45 + private final Database database;
46 private static final String TX_NOT_OPEN_ERROR = "Transaction is not open"; 46 private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
47 private static final int TRANSACTION_TIMEOUT_MILLIS = 2000; 47 private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
48 48
49 - DefaultTransactionContext(DatabaseProxy<String, byte[]> proxy) { 49 + DefaultTransactionContext(Database database) {
50 - this.databaseProxy = proxy; 50 + this.database = checkNotNull(database, "Database must not be null");
51 } 51 }
52 52
53 @Override 53 @Override
...@@ -63,7 +63,7 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -63,7 +63,7 @@ public class DefaultTransactionContext implements TransactionContext {
63 checkNotNull(serializer, "serializer is null"); 63 checkNotNull(serializer, "serializer is null");
64 checkState(isOpen, TX_NOT_OPEN_ERROR); 64 checkState(isOpen, TX_NOT_OPEN_ERROR);
65 if (!txMaps.containsKey(mapName)) { 65 if (!txMaps.containsKey(mapName)) {
66 - ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, databaseProxy, serializer); 66 + ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
67 DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer); 67 DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
68 txMaps.put(mapName, txMap); 68 txMaps.put(mapName, txMap);
69 } 69 }
...@@ -83,7 +83,7 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -83,7 +83,7 @@ public class DefaultTransactionContext implements TransactionContext {
83 allUpdates.addAll(m.prepareDatabaseUpdates()); 83 allUpdates.addAll(m.prepareDatabaseUpdates());
84 }); 84 });
85 85
86 - if (!complete(databaseProxy.atomicBatchUpdate(allUpdates))) { 86 + if (!complete(database.atomicBatchUpdate(allUpdates))) {
87 throw new TransactionException.OptimisticConcurrencyFailure(); 87 throw new TransactionException.OptimisticConcurrencyFailure();
88 } 88 }
89 } finally { 89 } finally {
......
...@@ -101,10 +101,10 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -101,10 +101,10 @@ public class DistributedLeadershipManager implements LeadershipService {
101 101
102 @Activate 102 @Activate
103 public void activate() { 103 public void activate() {
104 - lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() { 104 + lockMap = storageService.<String, NodeId>consistentMapBuilder()
105 - KryoNamespace kryo = new KryoNamespace.Builder() 105 + .withName("onos-leader-locks")
106 - .register(KryoNamespaces.API).build(); 106 + .withSerializer(new Serializer() {
107 - 107 + KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();
108 @Override 108 @Override
109 public <T> byte[] encode(T object) { 109 public <T> byte[] encode(T object) {
110 return kryo.serialize(object); 110 return kryo.serialize(object);
...@@ -114,7 +114,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -114,7 +114,8 @@ public class DistributedLeadershipManager implements LeadershipService {
114 public <T> T decode(byte[] bytes) { 114 public <T> T decode(byte[] bytes) {
115 return kryo.deserialize(bytes); 115 return kryo.deserialize(bytes);
116 } 116 }
117 - }); 117 + })
118 + .withPartitionsDisabled().build();
118 119
119 localNodeId = clusterService.getLocalNode().id(); 120 localNodeId = clusterService.getLocalNode().id();
120 121
......
...@@ -25,58 +25,63 @@ import java.util.concurrent.CompletableFuture; ...@@ -25,58 +25,63 @@ import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CopyOnWriteArrayList; 25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.atomic.AtomicBoolean; 26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger; 27 import java.util.concurrent.atomic.AtomicInteger;
28 +import java.util.stream.Collectors;
28 29
29 import org.onosproject.store.service.UpdateOperation; 30 import org.onosproject.store.service.UpdateOperation;
30 import org.onosproject.store.service.Versioned; 31 import org.onosproject.store.service.Versioned;
31 32
32 -import com.google.common.collect.ImmutableMap;
33 import com.google.common.collect.Lists; 33 import com.google.common.collect.Lists;
34 import com.google.common.collect.Maps; 34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Sets; 35 import com.google.common.collect.Sets;
36 36
37 -import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; 37 +import net.kuujo.copycat.Task;
38 - 38 +import net.kuujo.copycat.cluster.Cluster;
39 import static com.google.common.base.Preconditions.checkState; 39 import static com.google.common.base.Preconditions.checkState;
40 40
41 /** 41 /**
42 * A database that partitions the keys across one or more database partitions. 42 * A database that partitions the keys across one or more database partitions.
43 */ 43 */
44 -public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager { 44 +public class PartitionedDatabase implements Database {
45 45
46 - private Partitioner<String> partitioner; 46 + private final String name;
47 - private final ClusterCoordinator coordinator; 47 + private final Partitioner<String> partitioner;
48 - private final Map<String, Database> partitions = Maps.newConcurrentMap(); 48 + private final List<Database> partitions;
49 private final AtomicBoolean isOpen = new AtomicBoolean(false); 49 private final AtomicBoolean isOpen = new AtomicBoolean(false);
50 - private static final String DB_NOT_OPEN = "Database is not open"; 50 + private static final String DB_NOT_OPEN = "Partitioned Database is not open";
51 51
52 - protected PartitionedDatabase(ClusterCoordinator coordinator) { 52 + public PartitionedDatabase(
53 - this.coordinator = coordinator; 53 + String name,
54 + Collection<Database> partitions) {
55 + this.name = name;
56 + this.partitions = partitions
57 + .stream()
58 + .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
59 + .collect(Collectors.toList());
60 + this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
61 + }
62 +
63 + /**
64 + * Returns the databases for individual partitions.
65 + * @return list of database partitions
66 + */
67 + public List<Database> getPartitions() {
68 + return partitions;
54 } 69 }
55 70
56 /** 71 /**
57 * Returns true if the database is open. 72 * Returns true if the database is open.
58 * @return true if open, false otherwise 73 * @return true if open, false otherwise
59 */ 74 */
75 + @Override
60 public boolean isOpen() { 76 public boolean isOpen() {
61 return isOpen.get(); 77 return isOpen.get();
62 } 78 }
63 79
64 @Override 80 @Override
65 - public void registerPartition(String name, Database partition) {
66 - partitions.put(name, partition);
67 - }
68 -
69 - @Override
70 - public Map<String, Database> getRegisteredPartitions() {
71 - return ImmutableMap.copyOf(partitions);
72 - }
73 -
74 - @Override
75 public CompletableFuture<Integer> size(String tableName) { 81 public CompletableFuture<Integer> size(String tableName) {
76 checkState(isOpen.get(), DB_NOT_OPEN); 82 checkState(isOpen.get(), DB_NOT_OPEN);
77 AtomicInteger totalSize = new AtomicInteger(0); 83 AtomicInteger totalSize = new AtomicInteger(0);
78 return CompletableFuture.allOf(partitions 84 return CompletableFuture.allOf(partitions
79 - .values()
80 .stream() 85 .stream()
81 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet)) 86 .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
82 .toArray(CompletableFuture[]::new)) 87 .toArray(CompletableFuture[]::new))
...@@ -100,7 +105,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -100,7 +105,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
100 checkState(isOpen.get(), DB_NOT_OPEN); 105 checkState(isOpen.get(), DB_NOT_OPEN);
101 AtomicBoolean containsValue = new AtomicBoolean(false); 106 AtomicBoolean containsValue = new AtomicBoolean(false);
102 return CompletableFuture.allOf(partitions 107 return CompletableFuture.allOf(partitions
103 - .values()
104 .stream() 108 .stream()
105 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v))) 109 .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
106 .toArray(CompletableFuture[]::new)) 110 .toArray(CompletableFuture[]::new))
...@@ -129,7 +133,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -129,7 +133,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
129 public CompletableFuture<Void> clear(String tableName) { 133 public CompletableFuture<Void> clear(String tableName) {
130 checkState(isOpen.get(), DB_NOT_OPEN); 134 checkState(isOpen.get(), DB_NOT_OPEN);
131 return CompletableFuture.allOf(partitions 135 return CompletableFuture.allOf(partitions
132 - .values()
133 .stream() 136 .stream()
134 .map(p -> p.clear(tableName)) 137 .map(p -> p.clear(tableName))
135 .toArray(CompletableFuture[]::new)); 138 .toArray(CompletableFuture[]::new));
...@@ -140,7 +143,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -140,7 +143,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
140 checkState(isOpen.get(), DB_NOT_OPEN); 143 checkState(isOpen.get(), DB_NOT_OPEN);
141 Set<String> keySet = Sets.newConcurrentHashSet(); 144 Set<String> keySet = Sets.newConcurrentHashSet();
142 return CompletableFuture.allOf(partitions 145 return CompletableFuture.allOf(partitions
143 - .values()
144 .stream() 146 .stream()
145 .map(p -> p.keySet(tableName).thenApply(keySet::addAll)) 147 .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
146 .toArray(CompletableFuture[]::new)) 148 .toArray(CompletableFuture[]::new))
...@@ -152,7 +154,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -152,7 +154,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
152 checkState(isOpen.get(), DB_NOT_OPEN); 154 checkState(isOpen.get(), DB_NOT_OPEN);
153 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>(); 155 List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
154 return CompletableFuture.allOf(partitions 156 return CompletableFuture.allOf(partitions
155 - .values()
156 .stream() 157 .stream()
157 .map(p -> p.values(tableName).thenApply(values::addAll)) 158 .map(p -> p.values(tableName).thenApply(values::addAll))
158 .toArray(CompletableFuture[]::new)) 159 .toArray(CompletableFuture[]::new))
...@@ -164,7 +165,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -164,7 +165,6 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
164 checkState(isOpen.get(), DB_NOT_OPEN); 165 checkState(isOpen.get(), DB_NOT_OPEN);
165 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); 166 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
166 return CompletableFuture.allOf(partitions 167 return CompletableFuture.allOf(partitions
167 - .values()
168 .stream() 168 .stream()
169 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll)) 169 .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
170 .toArray(CompletableFuture[]::new)) 170 .toArray(CompletableFuture[]::new))
...@@ -225,32 +225,47 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -225,32 +225,47 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
225 } 225 }
226 226
227 @Override 227 @Override
228 - public void setPartitioner(Partitioner<String> partitioner) { 228 + public CompletableFuture<Database> open() {
229 - this.partitioner = partitioner; 229 + return CompletableFuture.allOf(partitions
230 - }
231 -
232 - @Override
233 - public CompletableFuture<PartitionedDatabase> open() {
234 - return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
235 - .values()
236 .stream() 230 .stream()
237 .map(Database::open) 231 .map(Database::open)
238 .toArray(CompletableFuture[]::new)) 232 .toArray(CompletableFuture[]::new))
239 .thenApply(v -> { 233 .thenApply(v -> {
240 isOpen.set(true); 234 isOpen.set(true);
241 - return this; })); 235 + return this; });
242 -
243 } 236 }
244 237
245 @Override 238 @Override
246 public CompletableFuture<Void> close() { 239 public CompletableFuture<Void> close() {
247 checkState(isOpen.get(), DB_NOT_OPEN); 240 checkState(isOpen.get(), DB_NOT_OPEN);
248 - CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions 241 + return CompletableFuture.allOf(partitions
249 - .values()
250 .stream() 242 .stream()
251 .map(database -> database.close()) 243 .map(database -> database.close())
252 .toArray(CompletableFuture[]::new)); 244 .toArray(CompletableFuture[]::new));
253 - CompletableFuture<Void> closeCoordinator = coordinator.close(); 245 + }
254 - return closePartitions.thenCompose(v -> closeCoordinator); 246 +
247 + @Override
248 + public boolean isClosed() {
249 + return !isOpen.get();
250 + }
251 +
252 + @Override
253 + public String name() {
254 + return name;
255 + }
256 +
257 + @Override
258 + public Cluster cluster() {
259 + throw new UnsupportedOperationException();
260 + }
261 +
262 + @Override
263 + public Database addStartupTask(Task<CompletableFuture<Void>> task) {
264 + throw new UnsupportedOperationException();
265 + }
266 +
267 + @Override
268 + public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
269 + throw new UnsupportedOperationException();
255 } 270 }
256 } 271 }
...\ No newline at end of file ...\ No newline at end of file
......
1 -/*
2 - * Copyright 2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -
17 -package org.onosproject.store.consistent.impl;
18 -
19 -import java.util.Collections;
20 -import java.util.HashMap;
21 -import java.util.Map;
22 -
23 -/**
24 - * Partitioned database configuration.
25 - */
26 -public class PartitionedDatabaseConfig {
27 - private final Map<String, DatabaseConfig> partitions = new HashMap<>();
28 -
29 - /**
30 - * Returns the configuration for all partitions.
31 - * @return partition map to configuartion mapping.
32 - */
33 - public Map<String, DatabaseConfig> partitions() {
34 - return Collections.unmodifiableMap(partitions);
35 - }
36 -
37 - /**
38 - * Adds the specified partition name and configuration.
39 - * @param name partition name.
40 - * @param config partition config
41 - * @return this instance
42 - */
43 - public PartitionedDatabaseConfig addPartition(String name, DatabaseConfig config) {
44 - partitions.put(name, config);
45 - return this;
46 - }
47 -}
1 -/*
2 - * Copyright 2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -
17 -package org.onosproject.store.consistent.impl;
18 -
19 -import java.util.Map;
20 -import java.util.concurrent.CompletableFuture;
21 -import java.util.concurrent.Executors;
22 -
23 -import net.kuujo.copycat.CopycatConfig;
24 -import net.kuujo.copycat.cluster.ClusterConfig;
25 -import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
26 -import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
27 -import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
28 -
29 -/**
30 - * Manages a PartitionedDatabase.
31 - */
32 -public interface PartitionedDatabaseManager {
33 - /**
34 - * Opens the database.
35 - *
36 - * @return A completable future to be completed with the result once complete.
37 - */
38 - CompletableFuture<PartitionedDatabase> open();
39 -
40 - /**
41 - * Closes the database.
42 - *
43 - * @return A completable future to be completed with the result once complete.
44 - */
45 - CompletableFuture<Void> close();
46 -
47 - /**
48 - * Sets the partitioner to use for mapping keys to partitions.
49 - *
50 - * @param partitioner partitioner
51 - */
52 - void setPartitioner(Partitioner<String> partitioner);
53 -
54 - /**
55 - * Registers a new partition.
56 - *
57 - * @param partitionName partition name.
58 - * @param partition partition.
59 - */
60 - void registerPartition(String partitionName, Database partition);
61 -
62 - /**
63 - * Returns all the registered database partitions.
64 - *
65 - * @return mapping of all registered database partitions.
66 - */
67 - Map<String, Database> getRegisteredPartitions();
68 -
69 -
70 - /**
71 - * Creates a new partitioned database.
72 - *
73 - * @param name The database name.
74 - * @param clusterConfig The cluster configuration.
75 - * @param partitionedDatabaseConfig The database configuration.
76 -
77 - * @return The database.
78 - */
79 - public static PartitionedDatabase create(
80 - String name,
81 - ClusterConfig clusterConfig,
82 - PartitionedDatabaseConfig partitionedDatabaseConfig) {
83 - CopycatConfig copycatConfig = new CopycatConfig()
84 - .withName(name)
85 - .withClusterConfig(clusterConfig)
86 - .withDefaultSerializer(new DatabaseSerializer())
87 - .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
88 - ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
89 - PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
90 - partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
91 - partitionedDatabase.registerPartition(partitionName ,
92 - coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
93 - .withSerializer(copycatConfig.getDefaultSerializer())
94 - .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
95 - partitionedDatabase.setPartitioner(
96 - new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
97 - return partitionedDatabase;
98 - }
99 -}
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 -import java.util.Map; 19 +import java.util.List;
20 20
21 /** 21 /**
22 * A simple Partitioner for mapping keys to database partitions. 22 * A simple Partitioner for mapping keys to database partitions.
...@@ -27,12 +27,12 @@ import java.util.Map; ...@@ -27,12 +27,12 @@ import java.util.Map;
27 */ 27 */
28 public class SimpleKeyHashPartitioner extends DatabasePartitioner { 28 public class SimpleKeyHashPartitioner extends DatabasePartitioner {
29 29
30 - public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) { 30 + public SimpleKeyHashPartitioner(List<Database> partitions) {
31 - super(partitionMap); 31 + super(partitions);
32 } 32 }
33 33
34 @Override 34 @Override
35 public Database getPartition(String tableName, String key) { 35 public Database getPartition(String tableName, String key) {
36 - return sortedPartitions[hash(key) % sortedPartitions.length]; 36 + return partitions.get(hash(key) % partitions.size());
37 } 37 }
38 } 38 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 -import java.util.Map; 19 +import java.util.List;
20 20
21 /** 21 /**
22 * A simple Partitioner that uses the table name hash to 22 * A simple Partitioner that uses the table name hash to
...@@ -28,12 +28,12 @@ import java.util.Map; ...@@ -28,12 +28,12 @@ import java.util.Map;
28 */ 28 */
29 public class SimpleTableHashPartitioner extends DatabasePartitioner { 29 public class SimpleTableHashPartitioner extends DatabasePartitioner {
30 30
31 - public SimpleTableHashPartitioner(Map<String, Database> partitionMap) { 31 + public SimpleTableHashPartitioner(List<Database> partitions) {
32 - super(partitionMap); 32 + super(partitions);
33 } 33 }
34 34
35 @Override 35 @Override
36 public Database getPartition(String tableName, String key) { 36 public Database getPartition(String tableName, String key) {
37 - return sortedPartitions[hash(tableName) % sortedPartitions.length]; 37 + return partitions.get(hash(tableName) % partitions.size());
38 } 38 }
39 } 39 }
...\ No newline at end of file ...\ No newline at end of file
......