Yuta HIGUCHI

DatabaseService subsystem: add admin commands, etc.

Change-Id: I24124579f5e0b03ccbf35a03230ae5a7aff95f22
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.packet.IpAddress;
/**
* Adds a new controller cluster node.
*/
@Command(scope = "onos", name = "tablet-add",
description = "Adds a new member to tablet")
public class TabletAddCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
// TODO context aware completer to get IP from ClusterService?
@Argument(index = 1, name = "ip", description = "Node IP address",
required = true, multiValued = false)
String ip = null;
@Argument(index = 2, name = "tcpPort", description = "Node TCP listen port",
required = false, multiValued = false)
int tcpPort = 9876;
// TODO add tablet name argument when we support multiple tablets
@Override
protected void execute() {
DatabaseAdminService service = get(DatabaseAdminService.class);
ControllerNode node = new DefaultControllerNode(new NodeId(nodeId),
IpAddress.valueOf(ip),
tcpPort);
service.addMember(node);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.cli;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
import java.util.Collections;
import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
/**
* Lists all controller cluster nodes.
*/
@Command(scope = "onos", name = "tablet-member",
description = "Lists all member nodes")
public class TabletMemberCommand extends AbstractShellCommand {
// TODO add tablet name argument when we support multiple tablets
@Override
protected void execute() {
DatabaseAdminService service = get(DatabaseAdminService.class);
ClusterService clusterService = get(ClusterService.class);
List<ControllerNode> nodes = newArrayList(service.listMembers());
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
if (outputJson()) {
print("%s", json(service, nodes));
} else {
ControllerNode self = clusterService.getLocalNode();
for (ControllerNode node : nodes) {
print("id=%s, address=%s:%s %s",
node.id(), node.ip(), node.tcpPort(),
node.equals(self) ? "*" : "");
}
}
}
// Produces JSON structure.
private JsonNode json(DatabaseAdminService service, List<ControllerNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
for (ControllerNode node : nodes) {
result.add(mapper.createObjectNode()
.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort()));
}
return result;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.service.DatabaseAdminService;
/**
* Removes a controller cluster node.
*/
@Command(scope = "onos", name = "tablet-remove",
description = "Removes a member from tablet")
public class TabletRemoveCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
// TODO add tablet name argument when we support multiple tablets
@Override
protected void execute() {
DatabaseAdminService service = get(DatabaseAdminService.class);
ClusterService clusterService = get(ClusterService.class);
ControllerNode node = clusterService.getNode(new NodeId(nodeId));
if (node != null) {
service.removeMember(node);
}
}
}
......@@ -20,6 +20,26 @@
<action class="org.onlab.onos.cli.SummaryCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.TabletMemberCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.TabletAddCommand"/>
<completers>
<ref component-id="nodeIdCompleter"/>
<null/>
<null/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.TabletRemoveCommand"/>
<completers>
<ref component-id="nodeIdCompleter"/>
<null/>
<null/>
</completers>
</command>
<command>
<action class="org.onlab.onos.cli.NodesListCommand"/>
</command>
<command>
......
package org.onlab.onos.store.service;
import java.util.Collection;
import java.util.List;
import org.onlab.onos.cluster.ControllerNode;
/**
* Service interface for running administrative tasks on a Database.
*/
......@@ -32,4 +35,26 @@ public interface DatabaseAdminService {
* Deletes all tables from the database.
*/
public void dropAllTables();
/**
* Add member to default Tablet.
*
* @param node to add
*/
public void addMember(ControllerNode node);
/**
* Remove member from default Tablet.
*
* @param node node to remove
*/
public void removeMember(ControllerNode node);
/**
* List members forming default Tablet.
*
* @return Copied collection of members forming default Tablet.
*/
public Collection<ControllerNode> listMembers();
}
......
......@@ -169,7 +169,9 @@ public class ClusterMessagingProtocol
@Override
public ProtocolClient createClient(TcpMember member) {
ControllerNode remoteNode = getControllerNode(member.host(), member.port());
checkNotNull(remoteNode, "A valid controller node is expected");
checkNotNull(remoteNode,
"A valid controller node is expected for %s:%s",
member.host(), member.port());
return new ClusterMessagingProtocolClient(
clusterCommunicator, clusterService.getLocalNode(), remoteNode);
}
......
......@@ -2,21 +2,30 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpCluster;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.log.InMemoryLog;
import net.kuujo.copycat.log.Log;
import org.apache.commons.lang3.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -27,6 +36,8 @@ import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.service.DatabaseAdminService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
......@@ -38,8 +49,12 @@ import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.WriteAborted;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
/**
* Strongly consistent and durable state management service based on
* Copycat implementation of Raft consensus protocol.
......@@ -56,7 +71,19 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseProtocolService copycatMessagingProtocol;
public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log_";
// Current working dir seems to be /opt/onos/apache-karaf-3.0.2
// TODO: Get the path to /opt/onos/config
private static final String CONFIG_DIR = "../config";
private static final String DEFAULT_MEMBER_FILE = "tablets.json";
private static final String DEFAULT_TABLET = "default";
// TODO: make this configurable
// initial member configuration file path
private String initialMemberConfig = DEFAULT_MEMBER_FILE;
private Copycat copycat;
private DatabaseClient client;
......@@ -65,49 +92,75 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
private ClusterConfig<TcpMember> clusterConfig;
private CountDownLatch clusterEventLatch;
private ClusterEventListener clusterEventListener;
private Map<String, Set<DefaultControllerNode>> tabletMembers;
private boolean autoAddMember = false;
@Activate
public void activate() {
// TODO: Not every node should be part of the consensus ring.
final ControllerNode localNode = clusterService.getLocalNode();
TcpMember localMember =
new TcpMember(
localNode.ip().toString(),
localNode.tcpPort());
// load tablet configuration
File file = new File(CONFIG_DIR, initialMemberConfig);
log.info("Loading config: {}", file.getAbsolutePath());
TabletDefinitionStore tabletDef = new TabletDefinitionStore(file);
try {
tabletMembers = tabletDef.read();
} catch (IOException e) {
log.error("Failed to load tablet config {}", file);
throw new IllegalStateException("Failed to load tablet config", e);
}
// load default tablet configuration and start copycat
clusterConfig = new TcpClusterConfig();
clusterConfig.setLocalMember(localMember);
List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size());
Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET);
if (defaultMember == null || defaultMember.isEmpty()) {
log.error("No member found in [{}] tablet configuration.",
DEFAULT_TABLET);
throw new IllegalStateException("No member found in tablet configuration");
clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
clusterService.addListener(clusterEventListener);
}
// note: from this point beyond, clusterConfig requires synchronization
final ControllerNode localNode = clusterService.getLocalNode();
TcpMember clientHandler = null;
for (ControllerNode member : defaultMember) {
final TcpMember tcpMember = new TcpMember(member.ip().toString(),
member.tcpPort());
if (localNode.equals(member)) {
clientHandler = tcpMember;
clusterConfig.setLocalMember(tcpMember);
} else {
clusterConfig.addRemoteMember(tcpMember);
}
}
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
if (!member.equals(localMember)) {
remoteMembers.add(member);
// TODO should be removed after DatabaseClient refactoring
if (clientHandler == null) {
Set<TcpMember> members = clusterConfig.getMembers();
if (members.isEmpty()) {
log.error("No member found in [{}] tablet configuration.",
DEFAULT_TABLET);
throw new IllegalStateException("No member found in tablet configuration");
}
int position = RandomUtils.nextInt(0, members.size());
clientHandler = Iterables.get(members, position);
}
if (remoteMembers.isEmpty()) {
log.info("This node is the only node in the cluster. "
+ "Waiting for others to show up.");
// FIXME: hack trying to relax cases forming multiple consensus rings.
// add seed node configuration to avoid this
// note: from this point beyond, clusterConfig requires synchronization
clusterEventLatch = new CountDownLatch(1);
clusterEventListener = new InternalClusterEventListener();
clusterService.addListener(clusterEventListener);
// If the node is alone on it's own, wait some time
// hoping other will come up soon
if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
// current cluster size smaller then expected
try {
if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
log.info("Starting as single node cluster");
log.info("Starting with {}/{} nodes cluster",
clusterService.getNodes().size(),
clusterConfig.getMembers().size());
}
} catch (InterruptedException e) {
log.info("Interrupted waiting for others", e);
......@@ -116,8 +169,6 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
final TcpCluster cluster;
synchronized (clusterConfig) {
clusterConfig.addRemoteMembers(remoteMembers);
// Create the cluster.
cluster = new TcpCluster(clusterConfig);
}
......@@ -131,7 +182,8 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
// FIXME Redo DatabaseClient. Needs fall back mechanism etc.
client = new DatabaseClient(copycatMessagingProtocol.createClient(clientHandler));
log.info("Started.");
}
......@@ -233,22 +285,34 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
node.tcpPort());
log.trace("{}", event);
switch (event.type()) {
case INSTANCE_ACTIVATED:
case INSTANCE_ADDED:
log.info("{} was added to the cluster", tcpMember);
synchronized (clusterConfig) {
clusterConfig.addRemoteMember(tcpMember);
if (autoAddMember) {
synchronized (clusterConfig) {
if (!clusterConfig.getMembers().contains(tcpMember)) {
log.info("{} was automatically added to the cluster", tcpMember);
clusterConfig.addRemoteMember(tcpMember);
}
}
}
break;
case INSTANCE_DEACTIVATED:
case INSTANCE_REMOVED:
// FIXME to be replaced with admin interface
// log.info("{} was removed from the cluster", tcpMember);
// synchronized (clusterConfig) {
// clusterConfig.removeRemoteMember(tcpMember);
// }
if (autoAddMember) {
Set<DefaultControllerNode> members
= tabletMembers.getOrDefault(DEFAULT_TABLET,
Collections.emptySet());
// remove only if not the initial members
if (!members.contains(node)) {
synchronized (clusterConfig) {
if (clusterConfig.getMembers().contains(tcpMember)) {
log.info("{} was automatically removed from the cluster", tcpMember);
clusterConfig.removeRemoteMember(tcpMember);
}
}
}
}
break;
default:
break;
......@@ -307,4 +371,58 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
}
}
@Override
public void addMember(final ControllerNode node) {
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
node.tcpPort());
log.info("{} was added to the cluster", tcpMember);
synchronized (clusterConfig) {
clusterConfig.addRemoteMember(tcpMember);
}
}
@Override
public void removeMember(final ControllerNode node) {
final TcpMember tcpMember = new TcpMember(node.ip().toString(),
node.tcpPort());
log.info("{} was removed from the cluster", tcpMember);
synchronized (clusterConfig) {
clusterConfig.removeRemoteMember(tcpMember);
}
}
@Override
public Collection<ControllerNode> listMembers() {
if (copycat == null) {
return ImmutableList.of();
}
Set<ControllerNode> members = new HashSet<>();
for (Member member : copycat.cluster().members()) {
if (member instanceof TcpMember) {
final TcpMember tcpMember = (TcpMember) member;
// TODO assuming tcpMember#host to be IP address,
// but if not lookup DNS, etc. first
IpAddress ip = IpAddress.valueOf(tcpMember.host());
int tcpPort = tcpMember.port();
NodeId id = getNodeIdFromIp(ip, tcpPort);
if (id == null) {
log.info("No NodeId found for {}:{}", ip, tcpPort);
continue;
}
members.add(new DefaultControllerNode(id, ip, tcpPort));
}
}
return members;
}
private NodeId getNodeIdFromIp(IpAddress ip, int tcpPort) {
for (ControllerNode node : clusterService.getNodes()) {
if (node.ip().equals(ip) &&
node.tcpPort() == tcpPort) {
return node.id();
}
}
return null;
}
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Allows for reading and writing tablet definition as a JSON file.
*/
public class TabletDefinitionStore {
private final Logger log = getLogger(getClass());
private final File file;
/**
* Creates a reader/writer of the tablet definition file.
*
* @param filePath location of the definition file
*/
public TabletDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Creates a reader/writer of the tablet definition file.
*
* @param filePath location of the definition file
*/
public TabletDefinitionStore(File filePath) {
file = checkNotNull(filePath);
}
/**
* Returns the Map from tablet name to set of initial member nodes.
*
* @return Map from tablet name to set of initial member nodes
* @throws IOException when I/O exception of some sort has occurred.
*/
public Map<String, Set<DefaultControllerNode>> read() throws IOException {
final Map<String, Set<DefaultControllerNode>> tablets = new HashMap<>();
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(file);
final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
while (fields.hasNext()) {
final Entry<String, JsonNode> next = fields.next();
final Set<DefaultControllerNode> nodes = new HashSet<>();
final Iterator<JsonNode> elements = next.getValue().elements();
while (elements.hasNext()) {
ObjectNode nodeDef = (ObjectNode) elements.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpAddress.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
tablets.put(next.getKey(), nodes);
}
return tablets;
}
/**
* Updates the Map from tablet name to set of member nodes.
*
* @param tabletName name of the tablet to update
* @param nodes set of initial member nodes
* @throws IOException when I/O exception of some sort has occurred.
*/
public void write(String tabletName, Set<DefaultControllerNode> nodes) throws IOException {
checkNotNull(tabletName);
checkArgument(tabletName.isEmpty(), "Tablet name cannot be empty");
// TODO should validate if tabletName is allowed in JSON
// load current
Map<String, Set<DefaultControllerNode>> config;
try {
config = read();
} catch (IOException e) {
log.info("Reading tablet config failed, assuming empty definition.");
config = new HashMap<>();
}
// update with specified
config.put(tabletName, nodes);
// write back to file
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode tabletNodes = mapper.createObjectNode();
for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
ArrayNode nodeDefs = mapper.createArrayNode();
tabletNodes.set(tablet.getKey(), nodeDefs);
for (DefaultControllerNode node : tablet.getValue()) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
tabletNodes);
}
}
onos-config command will copy files contained in this directory to ONOS instances according to cell definition
......@@ -25,3 +25,18 @@ ssh $remote "
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
"
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/
# Generate a default tablets.json from the ON* environment variables
TDEF_FILE=/tmp/tablets.json
echo "{ \"default\":[" > $TDEF_FILE
for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $TDEF_FILE
done
echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $TDEF_FILE
echo "]}" >> $TDEF_FILE
scp -q $TDEF_FILE $remote:$ONOS_INSTALL_DIR/config/
# copy tools/package/config/ to remote
scp -qr ${ONOS_ROOT}/tools/package/config/ $remote:$ONOS_INSTALL_DIR/
......