Madan Jampani

Added support for firing up multiple raft partitions + Workaround for an issue w…

…here db calls timeout when a raft cluster node is down.

Change-Id: I67406da34c8a96b8ab9371d4d9b14653edfd2e2d
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import java.util.Set;
import org.onosproject.cluster.DefaultControllerNode;
import com.google.common.collect.ImmutableSet;
/**
......@@ -11,16 +24,16 @@ import com.google.common.collect.ImmutableSet;
*/
public class ClusterDefinition {
private Set<DefaultControllerNode> nodes;
private Set<NodeInfo> nodes;
private String ipPrefix;
/**
* Creates a new cluster definition.
* @param nodes cluster nodes.
* @param ipPrefix ip prefix common to all cluster nodes.
* @param nodes cluster nodes information
* @param ipPrefix ip prefix common to all cluster nodes
* @return cluster definition
*/
public static ClusterDefinition from(Set<DefaultControllerNode> nodes, String ipPrefix) {
public static ClusterDefinition from(Set<NodeInfo> nodes, String ipPrefix) {
ClusterDefinition definition = new ClusterDefinition();
definition.ipPrefix = ipPrefix;
definition.nodes = ImmutableSet.copyOf(nodes);
......@@ -28,18 +41,18 @@ public class ClusterDefinition {
}
/**
* Returns set of cluster nodes.
* @return cluster nodes.
* Returns set of cluster nodes info.
* @return cluster nodes info
*/
public Set<DefaultControllerNode> nodes() {
public Set<NodeInfo> getNodes() {
return ImmutableSet.copyOf(nodes);
}
/**
* Returns ipPrefix in dotted decimal notion.
* @return ip prefix.
* @return ip prefix
*/
public String ipPrefix() {
public String getIpPrefix() {
return ipPrefix;
}
}
\ No newline at end of file
......
......@@ -15,25 +15,12 @@
*/
package org.onosproject.store.cluster.impl;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onlab.packet.IpAddress;
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
//Not used right now
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
......@@ -43,54 +30,32 @@ public class ClusterDefinitionStore {
/**
* Creates a reader/writer of the cluster definition file.
*
* @param filePath location of the definition file
*/
public ClusterDefinitionStore(String filePath) {
file = new File(filePath);
}
/*
* Returns set of the controller nodes, including self.
*
* @return set of controller nodes
/**
* Returns the cluster definition.
* @return cluster definition
* @throws IOException when I/O exception of some sort has occurred
*/
public ClusterDefinition read() throws IOException {
Set<DefaultControllerNode> nodes = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
while (it.hasNext()) {
ObjectNode nodeDef = (ObjectNode) it.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpAddress.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
ClusterDefinition definition = mapper.readValue(file, ClusterDefinition.class);
return definition;
}
String ipPrefix = clusterNodeDef.get("ipPrefix").asText();
return ClusterDefinition.from(nodes, ipPrefix);
}
/*
* Writes the given cluster definition.
*
* @param cluster definition
/**
* Writes the specified cluster definition to file.
* @param definition cluster definition
* @throws IOException when I/O exception of some sort has occurred
*/
public void write(ClusterDefinition definition) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = mapper.createObjectNode();
clusterNodeDef.set("ipPrefix", new TextNode(definition.ipPrefix()));
ArrayNode nodeDefs = mapper.createArrayNode();
clusterNodeDef.set("nodes", nodeDefs);
for (DefaultControllerNode node : definition.nodes()) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
clusterNodeDef);
checkNotNull(definition);
// write back to file
final ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
\ No newline at end of file
......
......@@ -127,7 +127,13 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
try {
clusterDefinition = new ClusterDefinitionStore(clusterDefinitionFile.getPath()).read();
seedNodes = ImmutableSet.copyOf(clusterDefinition.nodes());
seedNodes = ImmutableSet.copyOf(clusterDefinition.getNodes())
.stream()
.map(nodeInfo -> new DefaultControllerNode(
new NodeId(nodeInfo.getId()),
IpAddress.valueOf(nodeInfo.getIp()),
nodeInfo.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
log.warn("Failed to read cluster definition.", e);
}
......@@ -330,7 +336,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.ipPrefix())) {
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
return ip;
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static com.google.common.base.MoreObjects.toStringHelper;
import java.util.Objects;
import org.onosproject.cluster.ControllerNode;
/**
* Node info read from configuration files during bootstrap.
*/
public final class NodeInfo {
private final String id;
private final String ip;
private final int tcpPort;
private NodeInfo(String id, String ip, int port) {
this.id = id;
this.ip = ip;
this.tcpPort = port;
}
/**
* Creates a new instance.
* @param id node id
* @param ip node ip address
* @param port tcp port
* @return NodeInfo
*/
public static NodeInfo from(String id, String ip, int port) {
NodeInfo node = new NodeInfo(id, ip, port);
return node;
}
/**
* Returns the NodeInfo for a controller node.
* @param node controller node
* @return NodeInfo
*/
public static NodeInfo of(ControllerNode node) {
return NodeInfo.from(node.id().toString(), node.ip().toString(), node.tcpPort());
}
/**
* Returns node id.
* @return node id
*/
public String getId() {
return id;
}
/**
* Returns node ip.
* @return node ip
*/
public String getIp() {
return ip;
}
/**
* Returns node port.
* @return port
*/
public int getTcpPort() {
return tcpPort;
}
@Override
public int hashCode() {
return Objects.hash(id, ip, tcpPort);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o instanceof NodeInfo) {
NodeInfo that = (NodeInfo) o;
return Objects.equals(this.id, that.id) &&
Objects.equals(this.ip, that.ip) &&
Objects.equals(this.tcpPort, that.tcpPort);
}
return false;
}
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("ip", ip)
.add("tcpPort", tcpPort).toString();
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import java.util.Map;
import java.util.Set;
import org.onosproject.store.cluster.impl.NodeInfo;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* Partitioned database configuration.
*/
public class DatabaseDefinition {
private Map<String, Set<NodeInfo>> partitions;
private Set<NodeInfo> nodes;
/**
* Creates a new DatabaseDefinition.
* @param partitions partition map
* @param nodes set of nodes
* @return database definition
*/
public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions, Set<NodeInfo> nodes) {
checkNotNull(partitions);
checkNotNull(nodes);
DatabaseDefinition definition = new DatabaseDefinition();
definition.partitions = ImmutableMap.copyOf(partitions);
definition.nodes = ImmutableSet.copyOf(nodes);
return definition;
}
/**
* Returns the map of database partitions.
* @return db partition map
*/
public Map<String, Set<NodeInfo>> getPartitions() {
return partitions;
}
/**
* Returns the set of nodes.
* @return nodes
*/
public Set<NodeInfo> getNodes() {
return nodes;
}
}
\ No newline at end of file
......@@ -16,39 +16,16 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
/**
* Allows for reading and writing partitioned database definition as a JSON file.
*/
public class DatabaseDefinitionStore {
private final Logger log = getLogger(getClass());
private final File definitionfile;
/**
......@@ -57,7 +34,7 @@ public class DatabaseDefinitionStore {
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(String filePath) {
definitionfile = new File(filePath);
definitionfile = new File(checkNotNull(filePath));
}
/**
......@@ -70,72 +47,27 @@ public class DatabaseDefinitionStore {
}
/**
* Returns the Map from database partition name to set of initial active member nodes.
* Returns the database definition.
*
* @return Map from partition name to set of active member nodes
* @return database definition
* @throws IOException when I/O exception of some sort has occurred.
*/
public Map<String, Set<DefaultControllerNode>> read() throws IOException {
final Map<String, Set<DefaultControllerNode>> partitions = Maps.newHashMap();
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(definitionfile);
final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
while (fields.hasNext()) {
final Entry<String, JsonNode> next = fields.next();
final Set<DefaultControllerNode> nodes = new HashSet<>();
final Iterator<JsonNode> elements = next.getValue().elements();
while (elements.hasNext()) {
ObjectNode nodeDef = (ObjectNode) elements.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpAddress.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(DatabaseManager.COPYCAT_TCP_PORT)));
}
partitions.put(next.getKey(), nodes);
}
return partitions;
public DatabaseDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
DatabaseDefinition definition = mapper.readValue(definitionfile, DatabaseDefinition.class);
return definition;
}
/**
* Updates the Map from database partition name to set of member nodes.
* Writes the specified database definition to file.
*
* @param partitionName name of the database partition to update
* @param nodes set of initial member nodes
* @param definition database definition
* @throws IOException when I/O exception of some sort has occurred.
*/
public void write(String partitionName, Set<DefaultControllerNode> nodes) throws IOException {
checkNotNull(partitionName);
checkArgument(partitionName.isEmpty(), "Partition name cannot be empty");
// load current
Map<String, Set<DefaultControllerNode>> config;
try {
config = read();
} catch (IOException e) {
log.info("Reading partition config failed, assuming empty definition.");
config = new HashMap<>();
}
// update with specified
config.put(partitionName, nodes);
public void write(DatabaseDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode partitionNodes = mapper.createObjectNode();
for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
ArrayNode nodeDefs = mapper.createArrayNode();
partitionNodes.set(tablet.getKey(), nodeDefs);
for (DefaultControllerNode node : tablet.getValue()) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
}
mapper.writeTree(new JsonFactory().createGenerator(definitionfile, JsonEncoding.UTF8),
partitionNodes);
mapper.writeValue(definitionfile, definition);
}
}
......
......@@ -31,8 +31,7 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
......@@ -69,8 +68,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
protected String nodeToUri(ControllerNode node) {
return String.format("tcp://%s:%d", node.ip(), COPYCAT_TCP_PORT);
protected String nodeToUri(NodeInfo node) {
return String.format("tcp://%s:%d", node.getIp(), COPYCAT_TCP_PORT);
}
@Activate
......@@ -82,12 +81,11 @@ public class DatabaseManager implements StorageService, StorageAdminService {
File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", file.getAbsolutePath());
DatabaseDefinitionStore databaseDef = new DatabaseDefinitionStore(file);
Map<String, Set<DefaultControllerNode>> partitionMap;
Map<String, Set<NodeInfo>> partitionMap;
try {
partitionMap = databaseDef.read();
DatabaseDefinitionStore databaseDef = new DatabaseDefinitionStore(file);
partitionMap = databaseDef.read().getPartitions();
} catch (IOException e) {
log.error("Failed to load database config {}", file);
throw new IllegalStateException("Failed to load database config", e);
}
......@@ -99,7 +97,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
.map(this::nodeToUri)
.toArray(String[]::new);
String localNodeUri = nodeToUri(clusterService.getLocalNode());
String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
ClusterConfig clusterConfig = new ClusterConfig()
.withProtocol(new NettyTcpProtocol()
......
......@@ -39,18 +39,7 @@ ssh $remote "
# Generate a default tablets.json from the ON* environment variables
TDEF_FILE=/tmp/${remote}.tablets.json
nodes=( $(env | sort | egrep "OC[0-9]+" | cut -d= -f2) )
echo "{ \"default\":[" > $TDEF_FILE
while [ ${#nodes[@]} -gt 0 ]; do
node=${nodes[0]}
nodes=( ${nodes[@]:1} )
if [ "${#nodes[@]}" -ne "0" ]; then
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $TDEF_FILE
else
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }" >> $TDEF_FILE
fi
done
echo "]}" >> $TDEF_FILE
onos-gen-partitions $TDEF_FILE
scp -q $TDEF_FILE $remote:$ONOS_INSTALL_DIR/config/tablets.json
......