Yuta HIGUCHI

CopyCat: Dynamic cluster support

Change-Id: I887c52b35811abf37a2b59db034b07ccf01eed2c
......@@ -8,17 +8,21 @@ import java.util.List;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
......@@ -35,8 +39,6 @@ import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Strongly consistent and durable state management service based on
* Copycat implementation of Raft consensus protocol.
......@@ -58,17 +60,29 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
private Copycat copycat;
private DatabaseClient client;
// TODO: check if synchronization is required to read/modify this
private ClusterConfig<TcpMember> clusterConfig;
private ClusterEventListener clusterEventListener;
@Activate
public void activate() {
log.info("Starting.");
// TODO: Not every node can be part of the consensus ring.
// TODO: Not every node should be part of the consensus ring.
final ControllerNode localNode = clusterService.getLocalNode();
TcpMember localMember =
new TcpMember(
clusterService.getLocalNode().ip().toString(),
clusterService.getLocalNode().tcpPort());
List<TcpMember> remoteMembers = Lists.newArrayList();
localNode.ip().toString(),
localNode.tcpPort());
clusterConfig = new TcpClusterConfig();
clusterConfig.setLocalMember(localMember);
List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
clusterEventListener = new InternalClusterEventListener();
clusterService.addListener(clusterEventListener);
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
......@@ -76,21 +90,18 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
remoteMembers.add(member);
}
}
clusterConfig.addRemoteMembers(remoteMembers);
// Configure the cluster.
TcpClusterConfig config = new TcpClusterConfig();
log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers);
config.setLocalMember(localMember);
config.setRemoteMembers(remoteMembers.toArray(new TcpMember[]{}));
// Create the cluster.
TcpCluster cluster = new TcpCluster(config);
TcpCluster cluster = new TcpCluster(clusterConfig);
StateMachine stateMachine = new DatabaseStateMachine();
ControllerNode thisNode = clusterService.getLocalNode();
// FIXME resolve Chronicle + OSGi issue
//Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
Log consensusLog = new InMemoryLog();
Log consensusLog = new KryoRegisteredInMemoryLog();
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
......@@ -102,6 +113,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Deactivate
public void deactivate() {
clusterService.removeListener(clusterEventListener);
copycat.stop();
log.info("Stopped.");
}
......@@ -179,6 +191,46 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
private final class InternalClusterEventListener
implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
// TODO: Not every node should be part of the consensus ring.
final ControllerNode node = event.subject();
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
node.tcpPort());
log.trace("{}", event);
switch (event.type()) {
case INSTANCE_ACTIVATED:
case INSTANCE_ADDED:
log.info("{} was added to the cluster", tcpMember);
clusterConfig.addRemoteMember(tcpMember);
break;
case INSTANCE_DEACTIVATED:
case INSTANCE_REMOVED:
log.info("{} was removed from the cluster", tcpMember);
clusterConfig.removeRemoteMember(tcpMember);
break;
default:
break;
}
log.info("Current cluster: {}", clusterConfig.getMembers());
}
}
public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
public KryoRegisteredInMemoryLog() {
super();
// required to deserialize object across bundles
super.kryo.register(TcpMember.class, new TcpMemberSerializer());
super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
}
}
private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
private final R result;
......
package org.onlab.onos.store.service.impl;
import java.util.Collection;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
@Override
public void write(Kryo kryo, Output output, TcpClusterConfig object) {
kryo.writeClassAndObject(output, object.getLocalMember());
kryo.writeClassAndObject(output, object.getRemoteMembers());
}
@Override
public TcpClusterConfig read(Kryo kryo, Input input,
Class<TcpClusterConfig> type) {
TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
@SuppressWarnings("unchecked")
Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
return new TcpClusterConfig(localMember, remoteMembers);
}
}
package org.onlab.onos.store.service.impl;
import net.kuujo.copycat.cluster.TcpMember;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class TcpMemberSerializer extends Serializer<TcpMember> {
@Override
public void write(Kryo kryo, Output output, TcpMember object) {
output.writeString(object.host());
output.writeInt(object.port());
}
@Override
public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
String host = input.readString();
int port = input.readInt();
return new TcpMember(host, port);
}
}