Thomas Vachuska

Merge remote-tracking branch 'origin/master'

Showing 27 changed files with 1446 additions and 9 deletions
......@@ -28,11 +28,15 @@ public class DefaultEventSinkRegistryTest {
private DefaultEventSinkRegistry registry;
private static class FooEvent extends TestEvent {
public FooEvent(String subject) { super(Type.FOO, subject); }
public FooEvent(String subject) {
super(Type.FOO, subject);
}
}
private static class BarEvent extends TestEvent {
public BarEvent(String subject) { super(Type.BAR, subject); }
public BarEvent(String subject) {
super(Type.BAR, subject);
}
}
private static class FooSink implements EventSink<FooEvent> {
......
......@@ -45,6 +45,24 @@
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-chronicle</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-tcp</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
......
package org.onlab.onos.store.service;
import java.util.List;
/**
* Service for running administrative tasks on a Database.
*/
public interface DatabaseAdminService {
/**
* Creates a new table.
* Table creation is idempotent. Attempting to create a table
* that already exists will be a noop.
* @param name table name.
* @return true if the table was created by this call, false otherwise.
*/
public boolean createTable(String name);
/**
* Lists all the tables in the database.
* @return list of table names.
*/
public List<String> listTables();
/**
* Deletes a table from the database.
* @param name name of the table to delete.
*/
public void dropTable(String name);
/**
* Deletes all tables from the database.
*/
public void dropAllTables();
}
package org.onlab.onos.store.service;
/**
* Base exception type for database failures.
*/
@SuppressWarnings("serial")
public class DatabaseException extends RuntimeException {
public DatabaseException(String message, Throwable t) {
super(message, t);
}
public DatabaseException(String message) {
super(message);
}
public DatabaseException(Throwable t) {
super(t);
}
public DatabaseException() {
};
}
\ No newline at end of file
package org.onlab.onos.store.service;
import java.util.List;
public interface DatabaseService {
/**
* Performs a read on the database.
* @param request read request.
* @return ReadResult
* @throws DatabaseException
*/
ReadResult read(ReadRequest request);
/**
* Performs a batch read operation on the database.
* The main advantage of batch read operation is parallelization.
* @param batch batch of read requests to execute.
* @return
*/
List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
/**
* Performs a write operation on the database.
* @param request
* @return write result.
* @throws DatabaseException
*/
WriteResult write(WriteRequest request);
/**
* Performs a batch write operation on the database.
* Batch write provides transactional semantics. Either all operations
* succeed or none of them do.
* @param batch batch of write requests to execute as a transaction.
* @return result of executing the batch write operation.
*/
List<OptionalResult<WriteResult, DatabaseException>> batchWrite(List<WriteRequest> batch);
}
package org.onlab.onos.store.service;
/**
* Exception thrown when an operation (read or write) is requested for
* a table that does not exist.
*/
@SuppressWarnings("serial")
public class NoSuchTableException extends DatabaseException {
}
\ No newline at end of file
package org.onlab.onos.store.service;
/**
* Exception that indicates a optimistic lock failure.
*/
@SuppressWarnings("serial")
public class OptimisticLockException extends PreconditionFailedException {
}
package org.onlab.onos.store.service;
/**
* A container object which either has a result or an exception.
* <p>
* If a result is present, get() will return it otherwise get() will throw
* the exception that was encountered in the process of generating the result.
*
* @param <R> type of result.
* @param <E> exception encountered in generating the result.
*/
public interface OptionalResult<R, E extends Throwable> {
/**
* Returns the result.
* @return result
* @throws E if there is no valid result.
*/
public R get();
/**
* Returns true if there is a valid result.
* @return true is yes, false otherwise.
*/
public boolean hasValidResult();
}
package org.onlab.onos.store.service;
/**
* Exception that indicates a precondition failure.
* <ul>Scenarios that can cause this exception:
* <li>An operation that attempts to write a new value iff the current value is equal
* to some specified value.</li>
* <li>An operation that attempts to write a new value iff the current version
* matches a specified value</li>
* </ul>
*/
@SuppressWarnings("serial")
public class PreconditionFailedException extends DatabaseException {
}
package org.onlab.onos.store.service;
/**
* Database read request.
*/
public class ReadRequest {
private final String tableName;
private final String key;
public ReadRequest(String tableName, String key) {
this.tableName = tableName;
this.key = key;
}
/**
* Return the name of the table.
* @return table name.
*/
public String tableName() {
return tableName;
}
/**
* Returns the key.
* @return key.
*/
public String key() {
return key;
}
@Override
public String toString() {
return "ReadRequest [tableName=" + tableName + ", key=" + key + "]";
}
}
\ No newline at end of file
package org.onlab.onos.store.service;
import org.onlab.onos.store.service.impl.VersionedValue;
/**
* Database read result.
*/
public class ReadResult {
private final String tableName;
private final String key;
private final VersionedValue value;
public ReadResult(String tableName, String key, VersionedValue value) {
this.tableName = tableName;
this.key = key;
this.value = value;
}
/**
* Database table name.
* @return
*/
public String tableName() {
return tableName;
}
/**
* Database table key.
* @return key.
*/
public String key() {
return key;
}
/**
* value associated with the key.
* @return non-null value if the table contains one, null otherwise.
*/
public VersionedValue value() {
return value;
}
}
package org.onlab.onos.store.service;
/**
* Exception that indicates a write operation is aborted.
* Aborted operations do not mutate database state is any form.
*/
@SuppressWarnings("serial")
public class WriteAborted extends DatabaseException {
}
package org.onlab.onos.store.service;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Objects;
/**
* Database write request.
*/
public class WriteRequest {
private final String tableName;
private final String key;
private final byte[] newValue;
private final long previousVersion;
private final byte[] oldValue;
public WriteRequest(String tableName, String key, byte[] newValue) {
this(tableName, key, newValue, -1, null);
}
public WriteRequest(String tableName, String key, byte[] newValue, long previousVersion) {
this(tableName, key, newValue, previousVersion, null);
checkArgument(previousVersion >= 0);
}
public WriteRequest(String tableName, String key, byte[] newValue, byte[] oldValue) {
this(tableName, key, newValue, -1, oldValue);
}
private WriteRequest(String tableName, String key, byte[] newValue, long previousVersion, byte[] oldValue) {
checkArgument(tableName != null);
checkArgument(key != null);
checkArgument(newValue != null);
this.tableName = tableName;
this.key = key;
this.newValue = newValue;
this.previousVersion = previousVersion;
this.oldValue = oldValue;
}
public String tableName() {
return tableName;
}
public String key() {
return key;
}
public byte[] newValue() {
return newValue;
}
public long previousVersion() {
return previousVersion;
}
public byte[] oldValue() {
return oldValue;
}
@Override
public String toString() {
return "WriteRequest [tableName=" + tableName + ", key=" + key
+ ", newValue=" + newValue
+ ", previousVersion=" + previousVersion
+ ", oldValue=" + oldValue;
}
@Override
public int hashCode() {
return Objects.hash(key, tableName, previousVersion);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
WriteRequest other = (WriteRequest) obj;
return Objects.equals(this.key, other.key) &&
Objects.equals(this.tableName, other.tableName) &&
Objects.equals(this.previousVersion, other.previousVersion);
}
}
package org.onlab.onos.store.service;
import org.onlab.onos.store.service.impl.VersionedValue;
/**
* Database write result.
*/
public class WriteResult {
private final String tableName;
private final String key;
private final VersionedValue previousValue;
public WriteResult(String tableName, String key, VersionedValue previousValue) {
this.tableName = tableName;
this.key = key;
this.previousValue = previousValue;
}
public String tableName() {
return tableName;
}
public String key() {
return key;
}
public VersionedValue previousValue() {
return previousValue;
}
}
package org.onlab.onos.store.service.impl;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import net.kuujo.copycat.protocol.Response.Status;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.apache.commons.lang3.RandomUtils;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.WriteRequest;
public class DatabaseClient {
private final Endpoint copycatEp;
ProtocolClient client;
NettyMessagingService messagingService;
public DatabaseClient(Endpoint copycatEp) {
this.copycatEp = copycatEp;
}
private static String nextId() {
return UUID.randomUUID().toString();
}
public void activate() throws Exception {
messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
messagingService.activate();
client = new NettyProtocolClient(copycatEp, messagingService);
}
public void deactivate() throws Exception {
messagingService.deactivate();
}
public boolean createTable(String tableName) {
SubmitRequest request =
new SubmitRequest(
nextId(),
"createTable",
Arrays.asList(tableName));
CompletableFuture<SubmitResponse> future = client.submit(request);
try {
return (boolean) future.get().result();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
public void dropTable(String tableName) {
SubmitRequest request =
new SubmitRequest(
nextId(),
"dropTable",
Arrays.asList(tableName));
CompletableFuture<SubmitResponse> future = client.submit(request);
try {
if (future.get().status() == Status.OK) {
throw new DatabaseException(future.get().toString());
}
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
public void dropAllTables() {
SubmitRequest request =
new SubmitRequest(
nextId(),
"dropAllTables",
Arrays.asList());
CompletableFuture<SubmitResponse> future = client.submit(request);
try {
if (future.get().status() != Status.OK) {
throw new DatabaseException(future.get().toString());
}
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
@SuppressWarnings("unchecked")
public List<String> listTables() {
SubmitRequest request =
new SubmitRequest(
nextId(),
"listTables",
Arrays.asList());
CompletableFuture<SubmitResponse> future = client.submit(request);
try {
return (List<String>) future.get().result();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
@SuppressWarnings("unchecked")
public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
SubmitRequest request = new SubmitRequest(
nextId(),
"read",
Arrays.asList(requests));
CompletableFuture<SubmitResponse> future = client.submit(request);
try {
List<InternalReadResult> internalReadResults = (List<InternalReadResult>) future.get().result();
return internalReadResults;
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
@SuppressWarnings("unchecked")
public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
SubmitRequest request = new SubmitRequest(
nextId(),
"write",
Arrays.asList(requests));
CompletableFuture<SubmitResponse> future = client.submit(request);
try {
List<InternalWriteResult> internalWriteResults = (List<InternalWriteResult>) future.get().result();
return internalWriteResults;
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
}
package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.log.ChronicleLog;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.Endpoint;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.NoSuchTableException;
import org.onlab.onos.store.service.OptimisticLockException;
import org.onlab.onos.store.service.OptionalResult;
import org.onlab.onos.store.service.PreconditionFailedException;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.WriteAborted;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Strongly consistent and durable state management service based on
* Copycat implementation of Raft consensus protocol.
*/
@Component(immediate = true)
@Service
public class DatabaseManager implements DatabaseService, DatabaseAdminService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterService clusterService;
public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
private Copycat copycat;
private DatabaseClient client;
@Activate
public void activate() {
TcpMember localMember =
new TcpMember(
clusterService.getLocalNode().ip().toString(),
clusterService.getLocalNode().tcpPort());
List<TcpMember> remoteMembers = Lists.newArrayList();
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
if (!member.equals(localMember)) {
remoteMembers.add(member);
}
}
// Configure the cluster.
TcpClusterConfig config = new TcpClusterConfig();
config.setLocalMember(localMember);
config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
// Create the cluster.
TcpCluster cluster = new TcpCluster(config);
StateMachine stateMachine = new DatabaseStateMachine();
ControllerNode thisNode = clusterService.getLocalNode();
Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
copycat.start();
client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
log.info("Started.");
}
@Activate
public void deactivate() {
copycat.stop();
}
@Override
public boolean createTable(String name) {
return client.createTable(name);
}
@Override
public void dropTable(String name) {
client.dropTable(name);
}
@Override
public void dropAllTables() {
client.dropAllTables();
}
@Override
public List<String> listTables() {
return client.listTables();
}
@Override
public ReadResult read(ReadRequest request) {
return batchRead(Arrays.asList(request)).get(0).get();
}
@Override
public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
List<ReadRequest> batch) {
List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
for (InternalReadResult internalReadResult : client.batchRead(batch)) {
if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
new NoSuchTableException()));
} else {
readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
internalReadResult.result()));
}
}
return readResults;
}
@Override
public WriteResult write(WriteRequest request) {
return batchWrite(Arrays.asList(request)).get(0).get();
}
@Override
public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
List<WriteRequest> batch) {
List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new NoSuchTableException()));
} else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new OptimisticLockException()));
} else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
// TODO: throw a different exception?
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new PreconditionFailedException()));
} else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
new WriteAborted()));
} else {
writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
internalWriteResult.result()));
}
}
return writeResults;
}
private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
private final R result;
private final DatabaseException exception;
public DatabaseOperationResult(R result) {
this.result = result;
this.exception = null;
}
public DatabaseOperationResult(DatabaseException exception) {
this.result = null;
this.exception = exception;
}
@Override
public R get() {
if (result != null) {
return result;
}
throw exception;
}
@Override
public boolean hasValidResult() {
return result != null;
}
@Override
public String toString() {
if (result != null) {
return result.toString();
} else {
return exception.toString();
}
}
}
}
package org.onlab.onos.store.service.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.kuujo.copycat.Command;
import net.kuujo.copycat.Query;
import net.kuujo.copycat.StateMachine;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.util.KryoNamespace;
import com.google.common.collect.Maps;
public class DatabaseStateMachine implements StateMachine {
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
.register(NettyProtocol.COMMON)
.build()
.populate(1);
}
};
private State state = new State();
@Command
public boolean createTable(String tableName) {
return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
}
@Command
public boolean dropTable(String tableName) {
return state.getTables().remove(tableName) != null;
}
@Command
public boolean dropAllTables() {
state.getTables().clear();
return true;
}
@Query
public Set<String> listTables() {
return state.getTables().keySet();
}
@Query
public List<InternalReadResult> read(List<ReadRequest> requests) {
List<InternalReadResult> results = new ArrayList<>(requests.size());
for (ReadRequest request : requests) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
if (table == null) {
results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
continue;
}
VersionedValue value = table.get(request.key());
results.add(new InternalReadResult(
InternalReadResult.Status.OK,
new ReadResult(
request.tableName(),
request.key(),
value)));
}
return results;
}
@Command
public List<InternalWriteResult> write(List<WriteRequest> requests) {
boolean abort = false;
List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
for (WriteRequest request : requests) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
if (table == null) {
validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
abort = true;
continue;
}
VersionedValue value = table.get(request.key());
if (value == null) {
if (request.oldValue() != null) {
validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
abort = true;
continue;
} else if (request.previousVersion() >= 0) {
validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
abort = true;
continue;
}
}
if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
abort = true;
continue;
}
validationResults.add(InternalWriteResult.Status.OK);
}
List<InternalWriteResult> results = new ArrayList<>(requests.size());
if (abort) {
for (InternalWriteResult.Status validationResult : validationResults) {
if (validationResult == InternalWriteResult.Status.OK) {
results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
} else {
results.add(new InternalWriteResult(validationResult, null));
}
}
return results;
}
for (WriteRequest request : requests) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
synchronized (table) {
VersionedValue previousValue =
table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
results.add(new InternalWriteResult(
InternalWriteResult.Status.OK,
new WriteResult(request.tableName(), request.key(), previousValue)));
}
}
return results;
}
public class State {
private final Map<String, Map<String, VersionedValue>> tables =
Maps.newHashMap();
private long versionCounter = 1;
Map<String, Map<String, VersionedValue>> getTables() {
return tables;
}
long nextVersion() {
return versionCounter++;
}
}
@Override
public byte[] takeSnapshot() {
try {
return SERIALIZER.encode(state);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public void installSnapshot(byte[] data) {
try {
this.state = SERIALIZER.decode(data);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.ReadResult;
public class InternalReadResult {
public enum Status {
OK,
NO_SUCH_TABLE
}
private final Status status;
private final ReadResult result;
public InternalReadResult(Status status, ReadResult result) {
this.status = status;
this.result = result;
}
public Status status() {
return status;
}
public ReadResult result() {
return result;
}
@Override
public String toString() {
return "InternalReadResult [status=" + status + ", result=" + result
+ "]";
}
}
\ No newline at end of file
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.WriteResult;
public class InternalWriteResult {
public enum Status {
OK,
ABORTED,
NO_SUCH_TABLE,
OPTIMISTIC_LOCK_FAILURE,
PREVIOUS_VALUE_MISMATCH
}
private final Status status;
private final WriteResult result;
public InternalWriteResult(Status status, WriteResult result) {
this.status = status;
this.result = result;
}
public Status status() {
return status;
}
public WriteResult result() {
return result;
}
}
package org.onlab.onos.store.service.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Vector;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.internal.log.ConfigurationEntry;
import net.kuujo.copycat.internal.log.CopycatEntry;
import net.kuujo.copycat.internal.log.OperationEntry;
import net.kuujo.copycat.internal.log.SnapshotEntry;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.Response.Status;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.Protocol;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.onos.store.serializers.ImmutableListSerializer;
import org.onlab.onos.store.serializers.ImmutableMapSerializer;
import org.onlab.onos.store.serializers.ImmutableSetSerializer;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.util.KryoNamespace;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
*/
public class NettyProtocol implements Protocol<TcpMember> {
public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
// TODO: make this configurable.
public static final long RETRY_INTERVAL_MILLIS = 2000;
private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
.register(PingRequest.class)
.register(PingResponse.class)
.register(PollRequest.class)
.register(PollResponse.class)
.register(SyncRequest.class)
.register(SyncResponse.class)
.register(SubmitRequest.class)
.register(SubmitResponse.class)
.register(Status.class)
.register(ConfigurationEntry.class)
.register(SnapshotEntry.class)
.register(CopycatEntry.class)
.register(OperationEntry.class)
.register(TcpClusterConfig.class)
.register(TcpMember.class)
.build();
// TODO: Move to the right place.
private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
.register(ReadRequest.class)
.register(WriteRequest.class)
.register(InternalReadResult.class)
.register(InternalWriteResult.class)
.register(InternalReadResult.Status.class)
.register(WriteResult.class)
.register(ReadResult.class)
.register(InternalWriteResult.Status.class)
.register(VersionedValue.class)
.build();
public static final KryoNamespace COMMON = KryoNamespace.newBuilder()
.register(Arrays.asList().getClass(), new CollectionSerializer() {
@Override
@SuppressWarnings("rawtypes")
protected Collection<?> create(Kryo kryo, Input input, Class<Collection> type) {
return new ArrayList();
}
})
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableList.class, new ImmutableListSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
.register(
Vector.class,
ArrayList.class,
Arrays.asList().getClass(),
HashMap.class,
HashSet.class,
LinkedList.class,
byte[].class)
.build();
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(COPYCAT)
.register(COMMON)
.register(CRAFT)
.build()
.populate(1);
}
};
private NettyProtocolServer server = null;
// FIXME: This is a total hack.Assumes
// ProtocolServer is initialized before ProtocolClient
protected NettyProtocolServer getServer() {
if (server == null) {
throw new IllegalStateException("ProtocolServer is not initialized yet!");
}
return server;
}
@Override
public ProtocolServer createServer(TcpMember member) {
server = new NettyProtocolServer(member);
return server;
}
@Override
public ProtocolClient createClient(TcpMember member) {
return new NettyProtocolClient(this, member);
}
}
package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.slf4j.Logger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* {@link NettyMessagingService} based Copycat protocol client.
*/
public class NettyProtocolClient implements ProtocolClient {
private final Logger log = getLogger(getClass());
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
// Remote endpoint, this client instance is used
// for communicating with.
private final Endpoint remoteEp;
private final NettyMessagingService messagingService;
// TODO: Is 10 the right number of threads?
private static final ScheduledExecutorService THREAD_POOL =
new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
}
public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
this.remoteEp = remoteEp;
this.messagingService = messagingService;
}
@Override
public CompletableFuture<PingResponse> ping(PingRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<SyncResponse> sync(SyncRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<PollResponse> poll(PollRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<Void> connect() {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
}
public <I> String messageType(I input) {
Class<?> clazz = input.getClass();
if (clazz.equals(PollRequest.class)) {
return NettyProtocol.COPYCAT_POLL;
} else if (clazz.equals(SyncRequest.class)) {
return NettyProtocol.COPYCAT_SYNC;
} else if (clazz.equals(SubmitRequest.class)) {
return NettyProtocol.COPYCAT_SUBMIT;
} else if (clazz.equals(PingRequest.class)) {
return NettyProtocol.COPYCAT_PING;
} else {
throw new IllegalArgumentException("Unknown class " + clazz.getName());
}
}
private <I, O> CompletableFuture<O> requestReply(I request) {
CompletableFuture<O> future = new CompletableFuture<>();
THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS);
return future;
}
private class RPCTask<I, O> implements Runnable {
private final String messageType;
private final byte[] payload;
private final CompletableFuture<O> future;
public RPCTask(I request, CompletableFuture<O> future) {
this.messageType = messageType(request);
this.payload = NettyProtocol.SERIALIZER.encode(request);
this.future = future;
}
@Override
public void run() {
try {
byte[] response = messagingService
.sendAndReceive(remoteEp, messageType, payload)
.get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
future.complete(NettyProtocol.SERIALIZER.decode(response));
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
messageType.equals(NettyProtocol.COPYCAT_PING)) {
log.warn("Request to {} failed. Will retry "
+ "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
THREAD_POOL.schedule(
this,
NettyProtocol.RETRY_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(e);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
}
}
}
package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.NettyMessagingService;
import org.slf4j.Logger;
/**
* {@link NettyMessagingService} based Copycat protocol server.
*/
public class NettyProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
private final NettyMessagingService messagingService;
private RequestHandler handler;
public NettyProtocolServer(TcpMember member) {
messagingService = new NettyMessagingService(member.host(), member.port());
messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
}
protected NettyMessagingService getNettyMessagingService() {
return messagingService;
}
@Override
public void requestHandler(RequestHandler handler) {
this.handler = handler;
}
@Override
public CompletableFuture<Void> listen() {
try {
messagingService.activate();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
@Override
public CompletableFuture<Void> close() {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
messagingService.deactivate();
future.complete(null);
return future;
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
}
private class CopycatMessageHandler<T> implements MessageHandler {
@Override
public void handle(Message message) throws IOException {
T request = NettyProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
handler.ping((PingRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to ping request", e);
}
});
} else if (request.getClass().equals(PollRequest.class)) {
handler.poll((PollRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to poll request", e);
}
});
} else if (request.getClass().equals(SyncRequest.class)) {
handler.sync((SyncRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to sync request", e);
}
});
} else if (request.getClass().equals(SubmitRequest.class)) {
handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to submit request", e);
}
});
}
}
}
}
package org.onlab.onos.store.service.impl;
import java.util.Arrays;
/**
* Wrapper object that holds the object (as byte array) and its version.
*/
public class VersionedValue {
private final byte[] value;
private final long version;
/**
* Creates a new instance with the specified value and version.
* @param value
* @param version
*/
public VersionedValue(byte[] value, long version) {
this.value = value;
this.version = version;
}
/**
* Returns the value.
* @return value.
*/
public byte[] value() {
return value;
}
/**
* Returns the version.
* @return version.
*/
public long version() {
return version;
}
@Override
public String toString() {
return "VersionedValue [value=" + Arrays.toString(value) + ", version="
+ version + "]";
}
}
......@@ -32,9 +32,15 @@ public class RoleReplyInfo {
this.genId = genId;
this.xid = xid;
}
public RoleState getRole() { return role; }
public U64 getGenId() { return genId; }
public long getXid() { return xid; }
public RoleState getRole() {
return role;
}
public U64 getGenId() {
return genId;
}
public long getXid() {
return xid;
}
@Override
public String toString() {
return "[Role:" + role + " GenId:" + genId + " Xid:" + xid + "]";
......
......@@ -347,7 +347,7 @@ class RoleManager implements RoleHandler {
RoleState role = null;
OFNiciraControllerRole ncr = nrr.getRole();
switch(ncr) {
switch (ncr) {
case ROLE_MASTER:
role = RoleState.MASTER;
break;
......@@ -383,7 +383,7 @@ class RoleManager implements RoleHandler {
throws SwitchStateException {
OFControllerRole cr = rrmsg.getRole();
RoleState role = null;
switch(cr) {
switch (cr) {
case ROLE_EQUAL:
role = RoleState.EQUAL;
break;
......
......@@ -414,7 +414,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.3.7</version>
<version>2.5.3</version>
<extensions>true</extensions>
</plugin>
......@@ -493,6 +493,12 @@
<artifactId>onos-build-conf</artifactId>
<version>1.0</version>
</dependency>
<!-- For Java 8 lambda support-->
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>5.9</version>
</dependency>
</dependencies>
<configuration>
<configLocation>onos/checkstyle.xml</configLocation>
......
......@@ -50,7 +50,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
ByteBuf buffer,
List<Object> out) throws Exception {
switch(state()) {
switch (state()) {
case READ_HEADER_VERSION:
int headerVersion = buffer.readInt();
checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
......