Yuta HIGUCHI

Add sample accessing database service to Foo

Change-Id: I514c57a278dea368448d284eb5bf0d41bb0013e3
......@@ -15,6 +15,9 @@
*/
package org.onlab.onos.foo;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -33,9 +36,20 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.OptionalResult;
import org.onlab.onos.store.service.PreconditionFailedException;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static java.util.concurrent.Executors.newScheduledThreadPool;
/**
* Playground app component.
......@@ -57,22 +71,42 @@ public class FooComponent {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected DatabaseAdminService dbAdminService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
protected DatabaseService dbService;
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
private final IntentListener intentListener = new InnerIntentListener();
private final MastershipListener mastershipListener = new InnerMastershipListener();
private ScheduledExecutorService executor;
@Activate
public void activate() {
executor = newScheduledThreadPool(4, namedThreads("foo-executor-%d"));
clusterService.addListener(clusterListener);
deviceService.addListener(deviceListener);
intentService.addListener(intentListener);
mastershipService.addListener(mastershipListener);
if (dbService == null || dbAdminService == null) {
log.info("Couldn't find DB service");
} else {
log.info("Found DB service");
// longIncrementor();
// executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
// executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
}
log.info("Started");
}
@Deactivate
public void deactivate() {
executor.shutdown();
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
intentService.removeListener(intentListener);
......@@ -122,6 +156,65 @@ public class FooComponent {
}
}
}
private void longIncrementor() {
try {
final String someTable = "admin";
final String someKey = "long";
dbAdminService.createTable(someTable);
ReadResult read = dbService.read(ReadRequest.get(someTable, someKey));
if (!read.valueExists()) {
ByteBuffer zero = ByteBuffer.allocate(Long.BYTES).putLong(0);
try {
dbService.write(WriteRequest
.putIfAbsent(someTable,
someKey,
zero.array()));
log.info("Wrote initial value");
read = dbService.read(ReadRequest.get(someTable, someKey));
} catch (PreconditionFailedException e) {
log.info("Concurrent write detected.", e);
// concurrent write detected, read and fall through
read = dbService.read(ReadRequest.get(someTable, someKey));
if (!read.valueExists()) {
log.error("Shouldn't reach here");
}
}
}
int retry = 5;
do {
ByteBuffer prev = ByteBuffer.wrap(read.value().value());
long next = prev.getLong() + 1;
byte[] newValue = ByteBuffer.allocate(Long.BYTES).putLong(next).array();
OptionalResult<WriteResult, DatabaseException> result
= dbService.writeNothrow(WriteRequest
.putIfVersionMatches(someTable,
someKey,
newValue,
read.value().version()));
if (result.hasValidResult()) {
log.info("Write success {} -> {}", result.get().previousValue(), next);
break;
} else {
log.info("Write failed trying to write{}", next);
}
} while(retry-- > 0);
} catch (Exception e) {
log.error("Exception thrown", e);
}
}
private final class LongIncrementor implements Runnable {
@Override
public void run() {
longIncrementor();
}
}
}
......
......@@ -24,13 +24,24 @@ public interface DatabaseService {
*/
List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
// FIXME Give me a better name
/**
* Performs a write operation on the database.
* @param request write request
* @return write result.
* @throws DatabaseException if there is failure in execution write.
*/
WriteResult write(WriteRequest request);
OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request);
/**
* Performs a write operation on the database.
* @param request write request
* @return write result.
* @throws OptimisticLockException FIXME define conditional failure
* @throws PreconditionFailedException FIXME define conditional failure
* @throws DatabaseException if there is failure in execution write.
*/
WriteResult write(WriteRequest request)/* throws OptimisticLockException, PreconditionFailedException*/;
/**
* Performs a batch write operation on the database.
......
package org.onlab.onos.store.service;
/**
* Exception that indicates a precondition failure.
* Scenarios that can cause this exception:
......
......@@ -35,6 +35,15 @@ public class ReadResult {
}
/**
* Returns true if database table contained value for the key.
*
* @return true if database table contained value for the key
*/
public boolean valueExists() {
return value != null;
}
/**
* Returns value associated with the key.
* @return non-null value if the table contains one, null otherwise.
*/
......
package org.onlab.onos.store.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.onos.store.service.WriteRequest.Type.*;
import java.util.Objects;
......@@ -11,8 +13,13 @@ import com.google.common.base.MoreObjects;
*/
public class WriteRequest {
public static final int ANY_VERSION = -1;
private final String tableName;
private final String key;
private final Type type;
private final byte[] newValue;
private final long previousVersion;
private final byte[] oldValue;
......@@ -23,22 +30,22 @@ public class WriteRequest {
*
* @param tableName name of the table
* @param key key in the table
* @param newValue value to write
* @param newValue value to write, must not be null
* @return WriteRequest
*/
public static WriteRequest put(String tableName, String key,
byte[] newValue) {
return new WriteRequest(tableName, key, newValue, -1, null);
return new WriteRequest(PUT, tableName, key,
checkNotNull(newValue), ANY_VERSION, null);
}
// FIXME: Is there a special version value to realize putIfAbsent?
/**
* Creates a write request, which will
* put the specified value to the table if the previous version matches.
*
* @param tableName name of the table
* @param key key in the table
* @param newValue value to write
* @param newValue value to write, must not be null
* @param previousVersion previous version expected
* @return WriteRequest
*/
......@@ -46,37 +53,107 @@ public class WriteRequest {
byte[] newValue,
long previousVersion) {
checkArgument(previousVersion >= 0);
return new WriteRequest(tableName, key, newValue, previousVersion, null);
return new WriteRequest(PUT_IF_VERSION, tableName, key,
checkNotNull(newValue), previousVersion, null);
}
// FIXME: What is the behavior of oldValue=null? putIfAbsent?
/**
* Creates a write request, which will
* put the specified value to the table if the previous value matches.
*
* @param tableName name of the table
* @param key key in the table
* @param newValue value to write
* @param oldValue previous value expected
* @param newValue value to write, must not be null
* @param oldValue previous value expected, must not be null
* @return WriteRequest
*/
public static WriteRequest putIfValueMatches(String tableName, String key,
byte[] newValue,
byte[] oldValue) {
return new WriteRequest(tableName, key, newValue, -1, oldValue);
return new WriteRequest(PUT_IF_VALUE, tableName, key,
checkNotNull(newValue), ANY_VERSION,
checkNotNull(oldValue));
}
/**
* Creates a write request, which will
* put the specified value to the table if the previous value does not exist.
*
* @param tableName name of the table
* @param key key in the table
* @param newValue value to write, must not be null
* @return WriteRequest
*/
public static WriteRequest putIfAbsent(String tableName, String key,
byte[] newValue) {
return new WriteRequest(PUT_IF_ABSENT, tableName, key,
checkNotNull(newValue), ANY_VERSION, null);
}
// FIXME: How do we remove value? newValue=null?
/**
* Creates a write request, which will
* remove the specified entry from the table regardless of the previous value.
*
* @param tableName name of the table
* @param key key in the table
* @return WriteRequest
*/
public static WriteRequest remove(String tableName, String key) {
return new WriteRequest(REMOVE, tableName, key,
null, ANY_VERSION, null);
}
/**
* Creates a write request, which will
* remove the specified entry from the table if the previous version matches.
*
* @param tableName name of the table
* @param key key in the table
* @param previousVersion previous version expected
* @return WriteRequest
*/
public static WriteRequest remove(String tableName, String key,
long previousVersion) {
return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
null, previousVersion, null);
}
/**
* Creates a write request, which will
* remove the specified entry from the table if the previous value matches.
*
* @param tableName name of the table
* @param key key in the table
* @param oldValue previous value expected, must not be null
* @return WriteRequest
*/
public static WriteRequest remove(String tableName, String key,
byte[] oldValue) {
return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
null, ANY_VERSION, checkNotNull(oldValue));
}
public enum Type {
PUT,
PUT_IF_VERSION,
PUT_IF_VALUE,
PUT_IF_ABSENT,
REMOVE,
REMOVE_IF_VERSION,
REMOVE_IF_VALUE,
}
// hidden constructor
protected WriteRequest(String tableName, String key, byte[] newValue, long previousVersion, byte[] oldValue) {
protected WriteRequest(Type type, String tableName, String key,
byte[] newValue,
long previousVersion, byte[] oldValue) {
checkArgument(tableName != null);
checkArgument(key != null);
checkArgument(newValue != null);
checkNotNull(tableName);
checkNotNull(key);
this.tableName = tableName;
this.key = key;
this.type = type;
this.newValue = newValue;
this.previousVersion = previousVersion;
this.oldValue = oldValue;
......@@ -90,6 +167,10 @@ public class WriteRequest {
return key;
}
public WriteRequest.Type type() {
return type;
}
public byte[] newValue() {
return newValue;
}
......@@ -105,6 +186,7 @@ public class WriteRequest {
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("type", type)
.add("tableName", tableName)
.add("key", key)
.add("newValue", newValue)
......@@ -113,9 +195,10 @@ public class WriteRequest {
.toString();
}
// TODO: revisit hashCode, equals condition
@Override
public int hashCode() {
return Objects.hash(key, tableName, previousVersion);
return Objects.hash(type, key, tableName, previousVersion);
}
@Override
......@@ -130,7 +213,8 @@ public class WriteRequest {
return false;
}
WriteRequest other = (WriteRequest) obj;
return Objects.equals(this.key, other.key) &&
return Objects.equals(this.type, other.type) &&
Objects.equals(this.key, other.key) &&
Objects.equals(this.tableName, other.tableName) &&
Objects.equals(this.previousVersion, other.previousVersion);
}
......
......@@ -105,6 +105,7 @@ public class ClusterMessagingProtocol
private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
.register(ReadRequest.class)
.register(WriteRequest.class)
.register(WriteRequest.Type.class)
.register(InternalReadResult.class)
.register(InternalWriteResult.class)
.register(InternalReadResult.Status.class)
......@@ -135,6 +136,7 @@ public class ClusterMessagingProtocol
byte[].class)
.build();
// serializer used for CopyCat Protocol
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
......@@ -57,7 +58,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseProtocolService copycatMessagingProtocol;
public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
private Copycat copycat;
private DatabaseClient client;
......@@ -126,9 +127,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
StateMachine stateMachine = new DatabaseStateMachine();
// FIXME resolve Chronicle + OSGi issue
// Chronicle + OSGi issue
//Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Log consensusLog = new KryoRegisteredInMemoryLog();
//Log consensusLog = new KryoRegisteredInMemoryLog();
Log consensusLog = new MapDBLog(new File(LOG_FILE_PREFIX + localNode.id()),
ClusterMessagingProtocol.SERIALIZER);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
......@@ -187,8 +190,14 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
@Override
public WriteResult write(WriteRequest request) {
return batchWrite(Arrays.asList(request)).get(0).get();
public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
return batchWrite(Arrays.asList(request)).get(0);
}
@Override
public WriteResult write(WriteRequest request)
throws OptimisticLockException, PreconditionFailedException {
return writeNothrow(request).get();
}
@Override
......@@ -199,13 +208,13 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new NoSuchTableException()));
} else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
} else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new OptimisticLockException()));
} else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
// TODO: throw a different exception?
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new PreconditionFailedException()));
new OptimisticLockException()));
} else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new WriteAborted()));
......
......@@ -3,8 +3,10 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import net.kuujo.copycat.Command;
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
......@@ -15,6 +17,7 @@ import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
......@@ -32,6 +35,7 @@ public class DatabaseStateMachine implements StateMachine {
private final Logger log = getLogger(getClass());
// serializer used for snapshot
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
......@@ -87,6 +91,37 @@ public class DatabaseStateMachine implements StateMachine {
return results;
}
InternalWriteResult.Status checkIfApplicable(WriteRequest request,
VersionedValue value) {
switch (request.type()) {
case PUT:
return InternalWriteResult.Status.OK;
case PUT_IF_ABSENT:
if (value == null) {
return InternalWriteResult.Status.OK;
}
return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
case PUT_IF_VALUE:
case REMOVE_IF_VALUE:
if (value != null && Arrays.equals(value.value(), request.oldValue())) {
return InternalWriteResult.Status.OK;
}
return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
case PUT_IF_VERSION:
case REMOVE_IF_VERSION:
if (value != null && request.previousVersion() == value.version()) {
return InternalWriteResult.Status.OK;
}
return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
case REMOVE:
return InternalWriteResult.Status.OK;
}
log.error("Should never reach here {}", request);
return InternalWriteResult.Status.ABORTED;
}
@Command
public List<InternalWriteResult> write(List<WriteRequest> requests) {
......@@ -100,25 +135,12 @@ public class DatabaseStateMachine implements StateMachine {
abort = true;
continue;
}
VersionedValue value = table.get(request.key());
if (value == null) {
if (request.oldValue() != null) {
validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
abort = true;
continue;
} else if (request.previousVersion() >= 0) {
validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
abort = true;
continue;
}
}
if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
final VersionedValue value = table.get(request.key());
Status result = checkIfApplicable(request, value);
validationResults.add(result);
if (result != Status.OK) {
abort = true;
continue;
}
validationResults.add(InternalWriteResult.Status.OK);
}
List<InternalWriteResult> results = new ArrayList<>(requests.size());
......@@ -126,6 +148,7 @@ public class DatabaseStateMachine implements StateMachine {
if (abort) {
for (InternalWriteResult.Status validationResult : validationResults) {
if (validationResult == InternalWriteResult.Status.OK) {
// aborted due to applicability check failure on other request
results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
} else {
results.add(new InternalWriteResult(validationResult, null));
......@@ -141,12 +164,31 @@ public class DatabaseStateMachine implements StateMachine {
// synchronization scope is wrong.
// Whole function including applicability check needs to be protected.
// Confirm copycat's thread safety requirement for StateMachine
// TODO: If we need isolation, we need to block reads also
synchronized (table) {
VersionedValue previousValue =
table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
results.add(new InternalWriteResult(
InternalWriteResult.Status.OK,
new WriteResult(request.tableName(), request.key(), previousValue)));
switch (request.type()) {
case PUT:
case PUT_IF_ABSENT:
case PUT_IF_VALUE:
case PUT_IF_VERSION:
VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
VersionedValue previousValue = table.put(request.key(), newValue);
WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
results.add(InternalWriteResult.ok(putResult));
break;
case REMOVE:
case REMOVE_IF_VALUE:
case REMOVE_IF_VERSION:
VersionedValue removedValue = table.remove(request.key());
WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
results.add(InternalWriteResult.ok(removeResult));
break;
default:
log.error("Invalid WriteRequest type {}", request.type());
break;
}
}
}
return results;
......
package org.onlab.onos.store.service.impl;
import java.io.Serializable;
import org.onlab.onos.store.service.ReadResult;
/**
* Result of a read operation executed on the DatabaseStateMachine.
*/
public class InternalReadResult {
@SuppressWarnings("serial")
public class InternalReadResult implements Serializable {
public enum Status {
OK,
......
......@@ -11,13 +11,17 @@ public class InternalWriteResult {
OK,
ABORTED,
NO_SUCH_TABLE,
OPTIMISTIC_LOCK_FAILURE,
PREVIOUS_VERSION_MISMATCH,
PREVIOUS_VALUE_MISMATCH
}
private final Status status;
private final WriteResult result;
public static InternalWriteResult ok(WriteResult result) {
return new InternalWriteResult(Status.OK, result);
}
public InternalWriteResult(Status status, WriteResult result) {
this.status = status;
this.result = result;
......