Showing
7 changed files
with
87 additions
and
24 deletions
... | @@ -228,76 +228,89 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { | ... | @@ -228,76 +228,89 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { |
228 | 228 | ||
229 | @Override | 229 | @Override |
230 | public boolean putIfAbsent(String tableName, String key, byte[] value) { | 230 | public boolean putIfAbsent(String tableName, String key, byte[] value) { |
231 | - BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build(); | 231 | + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder() |
232 | + .putIfAbsent(tableName, key, value).build(); | ||
232 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); | 233 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); |
233 | if (writeResult.status().equals(WriteStatus.OK)) { | 234 | if (writeResult.status().equals(WriteStatus.OK)) { |
234 | return true; | 235 | return true; |
235 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { | 236 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { |
236 | return false; | 237 | return false; |
237 | } | 238 | } |
238 | - throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status()); | 239 | + throw new DatabaseException("putIfAbsent failed due to status: " |
240 | + + writeResult.status()); | ||
239 | } | 241 | } |
240 | 242 | ||
241 | @Override | 243 | @Override |
242 | public boolean putIfVersionMatches(String tableName, String key, | 244 | public boolean putIfVersionMatches(String tableName, String key, |
243 | byte[] value, long version) { | 245 | byte[] value, long version) { |
244 | - BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build(); | 246 | + BatchWriteRequest batchRequest = |
247 | + new BatchWriteRequest.Builder() | ||
248 | + .putIfVersionMatches(tableName, key, value, version).build(); | ||
245 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); | 249 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); |
246 | if (writeResult.status().equals(WriteStatus.OK)) { | 250 | if (writeResult.status().equals(WriteStatus.OK)) { |
247 | return true; | 251 | return true; |
248 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { | 252 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { |
249 | return false; | 253 | return false; |
250 | } | 254 | } |
251 | - throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status()); | 255 | + throw new DatabaseException("putIfVersionMatches failed due to status: " |
256 | + + writeResult.status()); | ||
252 | } | 257 | } |
253 | 258 | ||
254 | @Override | 259 | @Override |
255 | public boolean putIfValueMatches(String tableName, String key, | 260 | public boolean putIfValueMatches(String tableName, String key, |
256 | byte[] oldValue, byte[] newValue) { | 261 | byte[] oldValue, byte[] newValue) { |
257 | - BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build(); | 262 | + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder() |
263 | + .putIfValueMatches(tableName, key, oldValue, newValue).build(); | ||
258 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); | 264 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); |
259 | if (writeResult.status().equals(WriteStatus.OK)) { | 265 | if (writeResult.status().equals(WriteStatus.OK)) { |
260 | return true; | 266 | return true; |
261 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { | 267 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { |
262 | return false; | 268 | return false; |
263 | } | 269 | } |
264 | - throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status()); | 270 | + throw new DatabaseException("putIfValueMatches failed due to status: " |
271 | + + writeResult.status()); | ||
265 | } | 272 | } |
266 | 273 | ||
267 | @Override | 274 | @Override |
268 | public VersionedValue remove(String tableName, String key) { | 275 | public VersionedValue remove(String tableName, String key) { |
269 | - BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build(); | 276 | + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder() |
277 | + .remove(tableName, key).build(); | ||
270 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); | 278 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); |
271 | if (writeResult.status().equals(WriteStatus.OK)) { | 279 | if (writeResult.status().equals(WriteStatus.OK)) { |
272 | return writeResult.previousValue(); | 280 | return writeResult.previousValue(); |
273 | } | 281 | } |
274 | - throw new DatabaseException("remove failed due to status: " + writeResult.status()); | 282 | + throw new DatabaseException("remove failed due to status: " |
283 | + + writeResult.status()); | ||
275 | } | 284 | } |
276 | 285 | ||
277 | @Override | 286 | @Override |
278 | public boolean removeIfVersionMatches(String tableName, String key, | 287 | public boolean removeIfVersionMatches(String tableName, String key, |
279 | long version) { | 288 | long version) { |
280 | - BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build(); | 289 | + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder() |
290 | + .removeIfVersionMatches(tableName, key, version).build(); | ||
281 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); | 291 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); |
282 | if (writeResult.status().equals(WriteStatus.OK)) { | 292 | if (writeResult.status().equals(WriteStatus.OK)) { |
283 | return true; | 293 | return true; |
284 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { | 294 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { |
285 | return false; | 295 | return false; |
286 | } | 296 | } |
287 | - throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status()); | 297 | + throw new DatabaseException("removeIfVersionMatches failed due to status: " |
298 | + + writeResult.status()); | ||
288 | } | 299 | } |
289 | 300 | ||
290 | @Override | 301 | @Override |
291 | public boolean removeIfValueMatches(String tableName, String key, | 302 | public boolean removeIfValueMatches(String tableName, String key, |
292 | byte[] value) { | 303 | byte[] value) { |
293 | - BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build(); | 304 | + BatchWriteRequest batchRequest = new BatchWriteRequest.Builder() |
305 | + .removeIfValueMatches(tableName, key, value).build(); | ||
294 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); | 306 | WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0); |
295 | if (writeResult.status().equals(WriteStatus.OK)) { | 307 | if (writeResult.status().equals(WriteStatus.OK)) { |
296 | return true; | 308 | return true; |
297 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { | 309 | } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) { |
298 | return false; | 310 | return false; |
299 | } | 311 | } |
300 | - throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status()); | 312 | + throw new DatabaseException("removeIfValueMatches failed due to status: " |
313 | + + writeResult.status()); | ||
301 | } | 314 | } |
302 | 315 | ||
303 | @Override | 316 | @Override | ... | ... |
... | @@ -47,7 +47,7 @@ public class DatabaseStateMachine implements StateMachine { | ... | @@ -47,7 +47,7 @@ public class DatabaseStateMachine implements StateMachine { |
47 | private final Logger log = getLogger(getClass()); | 47 | private final Logger log = getLogger(getClass()); |
48 | 48 | ||
49 | // message subject for database update notifications. | 49 | // message subject for database update notifications. |
50 | - public static MessageSubject DATABASE_UPDATE_EVENTS = | 50 | + public static final MessageSubject DATABASE_UPDATE_EVENTS = |
51 | new MessageSubject("database-update-events"); | 51 | new MessageSubject("database-update-events"); |
52 | 52 | ||
53 | // serializer used for snapshot | 53 | // serializer used for snapshot | ... | ... |
... | @@ -38,11 +38,16 @@ import org.onlab.onos.store.service.DatabaseService; | ... | @@ -38,11 +38,16 @@ import org.onlab.onos.store.service.DatabaseService; |
38 | import org.slf4j.Logger; | 38 | import org.slf4j.Logger; |
39 | import org.slf4j.LoggerFactory; | 39 | import org.slf4j.LoggerFactory; |
40 | 40 | ||
41 | -public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> { | 41 | +/** |
42 | + * Database update event handler. | ||
43 | + */ | ||
44 | +public class DatabaseUpdateEventHandler implements | ||
45 | + DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> { | ||
42 | 46 | ||
43 | private final Logger log = LoggerFactory.getLogger(getClass()); | 47 | private final Logger log = LoggerFactory.getLogger(getClass()); |
44 | 48 | ||
45 | - public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event"); | 49 | + public static final MessageSubject DATABASE_UPDATES = |
50 | + new MessageSubject("database-update-event"); | ||
46 | 51 | ||
47 | private DatabaseService databaseService; | 52 | private DatabaseService databaseService; |
48 | private ClusterService cluster; | 53 | private ClusterService cluster; | ... | ... |
... | @@ -16,23 +16,26 @@ | ... | @@ -16,23 +16,26 @@ |
16 | 16 | ||
17 | package org.onlab.onos.store.service.impl; | 17 | package org.onlab.onos.store.service.impl; |
18 | 18 | ||
19 | +/** | ||
20 | + * Interface of database update event listeners. | ||
21 | + */ | ||
19 | public interface DatabaseUpdateEventListener { | 22 | public interface DatabaseUpdateEventListener { |
20 | 23 | ||
21 | /** | 24 | /** |
22 | - * | 25 | + * Notifies listeners of a table modified event. |
23 | - * @param event | 26 | + * @param event table modification event. |
24 | */ | 27 | */ |
25 | public void tableModified(TableModificationEvent event); | 28 | public void tableModified(TableModificationEvent event); |
26 | 29 | ||
27 | /** | 30 | /** |
28 | - * | 31 | + * Notifies listeners of a table created event. |
29 | * @param tableName | 32 | * @param tableName |
30 | * @param expirationTimeMillis | 33 | * @param expirationTimeMillis |
31 | */ | 34 | */ |
32 | public void tableCreated(String tableName, int expirationTimeMillis); | 35 | public void tableCreated(String tableName, int expirationTimeMillis); |
33 | 36 | ||
34 | /** | 37 | /** |
35 | - * | 38 | + * Notifies listeners of a table deleted event. |
36 | * @param tableName | 39 | * @param tableName |
37 | */ | 40 | */ |
38 | public void tableDeleted(String tableName); | 41 | public void tableDeleted(String tableName); | ... | ... |
... | @@ -69,7 +69,7 @@ public class DistributedLock implements Lock { | ... | @@ -69,7 +69,7 @@ public class DistributedLock implements Lock { |
69 | public boolean tryLock( | 69 | public boolean tryLock( |
70 | long waitTimeMillis, | 70 | long waitTimeMillis, |
71 | int leaseDurationMillis) { | 71 | int leaseDurationMillis) { |
72 | - if (tryLock(leaseDurationMillis) == false) { | 72 | + if (!tryLock(leaseDurationMillis)) { |
73 | CompletableFuture<Void> future = | 73 | CompletableFuture<Void> future = |
74 | lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis); | 74 | lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis); |
75 | try { | 75 | try { | ... | ... |
... | @@ -80,7 +80,10 @@ public class DistributedLockManager implements LockService { | ... | @@ -80,7 +80,10 @@ public class DistributedLockManager implements LockService { |
80 | throw new UnsupportedOperationException(); | 80 | throw new UnsupportedOperationException(); |
81 | } | 81 | } |
82 | 82 | ||
83 | - protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) { | 83 | + protected CompletableFuture<Void> lockIfAvailable( |
84 | + Lock lock, | ||
85 | + long waitTimeMillis, | ||
86 | + int leaseDurationMillis) { | ||
84 | CompletableFuture<Void> future = new CompletableFuture<>(); | 87 | CompletableFuture<Void> future = new CompletableFuture<>(); |
85 | locksToAcquire.put( | 88 | locksToAcquire.put( |
86 | lock.path(), | 89 | lock.path(), |
... | @@ -103,7 +106,9 @@ public class DistributedLockManager implements LockService { | ... | @@ -103,7 +106,9 @@ public class DistributedLockManager implements LockService { |
103 | 106 | ||
104 | if (event.type() == TableModificationEvent.Type.ROW_DELETED) { | 107 | if (event.type() == TableModificationEvent.Type.ROW_DELETED) { |
105 | List<LockRequest> existingRequests = locksToAcquire.get(path); | 108 | List<LockRequest> existingRequests = locksToAcquire.get(path); |
106 | - if (existingRequests == null) return; | 109 | + if (existingRequests == null) { |
110 | + return; | ||
111 | + } | ||
107 | 112 | ||
108 | Iterator<LockRequest> existingRequestIterator = existingRequests.iterator(); | 113 | Iterator<LockRequest> existingRequestIterator = existingRequests.iterator(); |
109 | while (existingRequestIterator.hasNext()) { | 114 | while (existingRequestIterator.hasNext()) { |
... | @@ -111,7 +116,7 @@ public class DistributedLockManager implements LockService { | ... | @@ -111,7 +116,7 @@ public class DistributedLockManager implements LockService { |
111 | if (request.expirationTime().isAfter(DateTime.now())) { | 116 | if (request.expirationTime().isAfter(DateTime.now())) { |
112 | existingRequestIterator.remove(); | 117 | existingRequestIterator.remove(); |
113 | } else { | 118 | } else { |
114 | - if (request.lock().tryLock(request.leaseDurationMillis()) == true) { | 119 | + if (request.lock().tryLock(request.leaseDurationMillis())) { |
115 | request.future().complete(null); | 120 | request.future().complete(null); |
116 | existingRequests.remove(0); | 121 | existingRequests.remove(0); |
117 | } | 122 | } | ... | ... |
1 | package org.onlab.onos.store.service.impl; | 1 | package org.onlab.onos.store.service.impl; |
2 | 2 | ||
3 | -public class TableModificationEvent { | 3 | +/** |
4 | + * A table modification event. | ||
5 | + */ | ||
6 | +public final class TableModificationEvent { | ||
4 | 7 | ||
8 | + /** | ||
9 | + * Type of table modification event. | ||
10 | + * | ||
11 | + */ | ||
5 | public enum Type { | 12 | public enum Type { |
6 | ROW_ADDED, | 13 | ROW_ADDED, |
7 | ROW_DELETED, | 14 | ROW_DELETED, |
... | @@ -12,14 +19,32 @@ public class TableModificationEvent { | ... | @@ -12,14 +19,32 @@ public class TableModificationEvent { |
12 | private final String key; | 19 | private final String key; |
13 | private final Type type; | 20 | private final Type type; |
14 | 21 | ||
22 | + /** | ||
23 | + * Creates a new row deleted table modification event. | ||
24 | + * @param tableName table name. | ||
25 | + * @param key row key | ||
26 | + * @return table modification event. | ||
27 | + */ | ||
15 | public static TableModificationEvent rowDeleted(String tableName, String key) { | 28 | public static TableModificationEvent rowDeleted(String tableName, String key) { |
16 | return new TableModificationEvent(tableName, key, Type.ROW_DELETED); | 29 | return new TableModificationEvent(tableName, key, Type.ROW_DELETED); |
17 | } | 30 | } |
18 | 31 | ||
32 | + /** | ||
33 | + * Creates a new row added table modification event. | ||
34 | + * @param tableName table name. | ||
35 | + * @param key row key | ||
36 | + * @return table modification event. | ||
37 | + */ | ||
19 | public static TableModificationEvent rowAdded(String tableName, String key) { | 38 | public static TableModificationEvent rowAdded(String tableName, String key) { |
20 | return new TableModificationEvent(tableName, key, Type.ROW_ADDED); | 39 | return new TableModificationEvent(tableName, key, Type.ROW_ADDED); |
21 | } | 40 | } |
22 | 41 | ||
42 | + /** | ||
43 | + * Creates a new row updated table modification event. | ||
44 | + * @param tableName table name. | ||
45 | + * @param key row key | ||
46 | + * @return table modification event. | ||
47 | + */ | ||
23 | public static TableModificationEvent rowUpdated(String tableName, String key) { | 48 | public static TableModificationEvent rowUpdated(String tableName, String key) { |
24 | return new TableModificationEvent(tableName, key, Type.ROW_UPDATED); | 49 | return new TableModificationEvent(tableName, key, Type.ROW_UPDATED); |
25 | } | 50 | } |
... | @@ -30,14 +55,26 @@ public class TableModificationEvent { | ... | @@ -30,14 +55,26 @@ public class TableModificationEvent { |
30 | this.type = type; | 55 | this.type = type; |
31 | } | 56 | } |
32 | 57 | ||
58 | + /** | ||
59 | + * Returns name of table this event is for. | ||
60 | + * @return table name | ||
61 | + */ | ||
33 | public String tableName() { | 62 | public String tableName() { |
34 | return tableName; | 63 | return tableName; |
35 | } | 64 | } |
36 | 65 | ||
66 | + /** | ||
67 | + * Returns the row key this event is for. | ||
68 | + * @return row key | ||
69 | + */ | ||
37 | public String key() { | 70 | public String key() { |
38 | return key; | 71 | return key; |
39 | } | 72 | } |
40 | 73 | ||
74 | + /** | ||
75 | + * Returns the type of table modification event. | ||
76 | + * @return event type. | ||
77 | + */ | ||
41 | public Type type() { | 78 | public Type type() { |
42 | return type; | 79 | return type; |
43 | } | 80 | } | ... | ... |
-
Please register or login to post a comment