Madan Jampani
Committed by Gerrit Code Review

In preparation for dynamic clustering support:

- Added Cluster metadata service and metadata store interfaces
- Added a static cluster metadata store implementation that is backed by a local file.
- Consolidated the existing cluster.json and tablets.json metadata files into a single cluster.json file that has all the cluster related metadata.
- Removed dependency on ONOS_NIC env variable.

Change-Id: Ia0a8bb69740caecdcdde71a9408be37c56ae2504
Showing 22 changed files with 798 additions and 564 deletions
......@@ -30,9 +30,8 @@ public interface ClusterAdminService {
* instance.
*
* @param nodes set of nodes that form the cluster
* @param ipPrefix IP address prefix, e.g. 10.0.1.*
*/
void formCluster(Set<ControllerNode> nodes, String ipPrefix);
void formCluster(Set<ControllerNode> nodes);
/**
* Adds a new controller node to the cluster.
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verifyNotNull;
import static com.google.common.base.Verify.verify;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
/**
* Cluster metadata.
* <p>
* Metadata specifies the attributes that define a ONOS cluster and comprises the collection
* of {@link org.onosproject.cluster.ControllerNode nodes} and the collection of data
* {@link org.onosproject.cluster.Partition partitions}.
*/
public final class ClusterMetadata {
private String name;
private Set<ControllerNode> nodes;
private Set<Partition> partitions;
/**
* Returns a new cluster metadata builder.
* @return The cluster metadata builder.
*/
public static Builder builder() {
return new Builder();
}
/**
* Returns the name of the cluster.
*
* @return cluster name
*/
public String getName() {
return this.name;
}
/**
* Returns the collection of {@link org.onosproject.cluster.ControllerNode nodes} that make up the cluster.
* @return cluster nodes
*/
public Collection<ControllerNode> getNodes() {
return this.nodes;
}
/**
* Returns the collection of data {@link org.onosproject.cluster.Partition partitions} that make up the cluster.
* @return collection of partitions.
*/
public Collection<Partition> getPartitions() {
return this.partitions;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(ClusterMetadata.class)
.add("name", name)
.add("nodes", nodes)
.add("partitions", partitions)
.toString();
}
/**
* Builder for a {@link ClusterMetadata} instance.
*/
public static class Builder {
private final ClusterMetadata metadata;
public Builder() {
metadata = new ClusterMetadata();
}
/**
* Sets the cluster name, returning the cluster metadata builder for method chaining.
* @param name cluster name
* @return this cluster metadata builder
*/
public Builder withName(String name) {
metadata.name = checkNotNull(name);
return this;
}
/**
* Sets the collection of cluster nodes, returning the cluster metadata builder for method chaining.
* @param controllerNodes collection of cluster nodes
* @return this cluster metadata builder
*/
public Builder withControllerNodes(Collection<ControllerNode> controllerNodes) {
metadata.nodes = ImmutableSet.copyOf(checkNotNull(controllerNodes));
return this;
}
/**
* Sets the collection of data partitions, returning the cluster metadata builder for method chaining.
* @param partitions collection of partitions
* @return this cluster metadata builder
*/
public Builder withPartitions(Collection<Partition> partitions) {
metadata.partitions = ImmutableSet.copyOf(checkNotNull(partitions));
return this;
}
/**
* Builds the cluster metadata.
* @return cluster metadata
* @throws com.google.common.base.VerifyException VerifyException if the metadata is misconfigured
*/
public ClusterMetadata build() {
verifyMetadata();
return metadata;
}
/**
* Validates the constructed metadata for semantic correctness.
* @throws VerifyException if the metadata is misconfigured.
*/
private void verifyMetadata() {
verifyNotNull(metadata.getName(), "Cluster name must be specified");
verifyNotNull(metadata.getNodes(), "Cluster nodes must be specified");
verifyNotNull(metadata.getPartitions(), "Cluster partitions must be specified");
verify(!metadata.getNodes().isEmpty(), "Cluster nodes must not be empty");
verify(!metadata.getPartitions().isEmpty(), "Cluster nodes must not be empty");
// verify that partitions are constituted from valid cluster nodes.
boolean validPartitions = Collections2.transform(metadata.getNodes(), ControllerNode::id)
.containsAll(metadata.getPartitions()
.stream()
.flatMap(r -> r.getMembers().stream())
.collect(Collectors.toSet()));
verify(validPartitions, "Partition locations must be valid cluster nodes");
}
}
}
......@@ -13,46 +13,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
package org.onosproject.cluster;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.onosproject.event.AbstractEvent;
/**
* Cluster definition.
* Describes a cluster metadata event.
*/
public class ClusterDefinition {
private Set<NodeInfo> nodes;
private String ipPrefix;
public class ClusterMetadataEvent extends AbstractEvent<ClusterMetadataEvent.Type, ClusterMetadata> {
/**
* Creates a new cluster definition.
* @param nodes cluster nodes information
* @param ipPrefix ip prefix common to all cluster nodes
* @return cluster definition
* Type of cluster metadata events.
*/
public enum Type {
/**
* Signifies that the cluster metadata has changed.
*/
public static ClusterDefinition from(Set<NodeInfo> nodes, String ipPrefix) {
ClusterDefinition definition = new ClusterDefinition();
definition.ipPrefix = ipPrefix;
definition.nodes = ImmutableSet.copyOf(nodes);
return definition;
METADATA_CHANGED,
}
/**
* Returns set of cluster nodes info.
* @return cluster nodes info
* Creates an event of a given type and for the specified metadata and the
* current time.
*
* @param type cluster metadata event type
* @param metadata cluster metadata subject
*/
public Set<NodeInfo> getNodes() {
return ImmutableSet.copyOf(nodes);
public ClusterMetadataEvent(Type type, ClusterMetadata metadata) {
super(type, metadata);
}
/**
* Returns ipPrefix in dotted decimal notion.
* @return ip prefix
* Creates an event of a given type and for the specified metadata and time.
*
* @param type cluster metadata event type
* @param metadata cluster metadata subject
* @param time occurrence time
*/
public String getIpPrefix() {
return ipPrefix;
public ClusterMetadataEvent(Type type, ClusterMetadata metadata, long time) {
super(type, metadata, time);
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import org.onosproject.event.EventListener;
/**
* Entity capable of receiving cluster metadata related events.
*/
public interface ClusterMetadataEventListener extends EventListener<ClusterMetadataEvent> {
}
......@@ -15,33 +15,26 @@
*/
package org.onosproject.cluster;
import java.util.Set;
/**
* Service for obtaining the static definition of a controller cluster.
* Service for obtaining metadata information about the cluster.
*/
public interface ClusterDefinitionService {
public interface ClusterMetadataService {
/**
* Returns the local controller node.
* @return local controller node
* Returns the current cluster metadata.
* @return cluster metadata
*/
ControllerNode localNode();
ClusterMetadata getClusterMetadata();
/**
* Returns the set of seed nodes that should be used for discovering other members
* of the cluster.
* @return set of seed controller nodes
* Updates the cluster metadata.
* @param metadata new metadata
*/
Set<ControllerNode> seedNodes();
void setClusterMetadata(ClusterMetadata metadata);
/**
* Forms cluster configuration based on the specified set of node
* information. Assumes subsequent restart for the new configuration to
* take hold.
*
* @param nodes set of nodes that form the cluster
* @param ipPrefix IP address prefix, e.g. 10.0.1.*
* Returns the local controller node representing this instance.
* @return local controller node
*/
void formCluster(Set<ControllerNode> nodes, String ipPrefix);
ControllerNode getLocalNode();
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Collection;
import org.onosproject.store.Store;
import org.onosproject.store.service.Versioned;
/**
* Manages persistence of cluster metadata; not intended for direct use.
*/
public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, ClusterMetadataStoreDelegate> {
/**
* Returns the cluster metadata.
* <p>
* The returned metadata is versioned to aid determining if a metadata instance is more recent than another.
* @return cluster metadata
*/
Versioned<ClusterMetadata> getClusterMetadata();
/**
* Updates the cluster metadata.
* @param metadata new metadata value
*/
void setClusterMetadata(ClusterMetadata metadata);
// TODO: The below methods should move to a separate store interface that is responsible for
// tracking cluster partition operational state.
/**
* Sets a controller node as an active member of a partition.
* <p>
* Active members are those replicas that are up to speed with the rest of the system and are
* usually capable of participating in the replica state management activities in accordance with
* the data consistency and replication protocol in use.
* @param partitionId partition identifier
* @param nodeId id of controller node
*/
void setActiveReplica(String partitionId, NodeId nodeId);
/**
* Removes a controller node as an active member for a partition.
* <p>
* Active members are those replicas that are up to speed with the rest of the system and are
* usually capable of participating in the replica state management activities in accordance with
* the data consistency and replication protocol in use.
* @param partitionId partition identifier
* @param nodeId id of controller node
*/
void unsetActiveReplica(String partitionId, NodeId nodeId);
/**
* Returns the collection of controller nodes that are the active replicas for a partition.
* <p>
* Active members are those replicas that are up to speed with the rest of the system and are
* usually capable of participating in the replica state management activities in accordance with
* the data consistency and replication protocol in use.
* @param partitionId partition identifier
* @return identifiers of controller nodes that are the active replicas
*/
Collection<NodeId> getActiveReplicas(String partitionId);
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import org.onosproject.store.StoreDelegate;
/**
* Cluster metadata store delegate abstraction.
*/
public interface ClusterMetadataStoreDelegate extends StoreDelegate<ClusterMetadataEvent> {
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Collection;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A data partition.
* <p>
* Partition represents a slice of the data space and is made up of a collection
* of {@link org.onosproject.cluster.ControllerNode nodes}
* that all maintain copies of this data.
*/
public class Partition {
private final String name;
private final Set<NodeId> members;
private Partition() {
name = null;
members = null;
}
public Partition(String name, Collection<NodeId> members) {
this.name = checkNotNull(name);
this.members = ImmutableSet.copyOf(checkNotNull(members));
}
/**
* Returns the partition name.
* <p>
* Each partition is identified by a unique name.
* @return partition name
*/
public String getName() {
return this.name;
}
/**
* Returns the collection of controller node identifiers that make up this partition.
* @return collection of controller node identifiers
*/
public Collection<NodeId> getMembers() {
return this.members;
}
}
\ No newline at end of file
......@@ -25,17 +25,26 @@ import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
......@@ -44,8 +53,6 @@ import static org.onosproject.security.AppGuard.checkPermission;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.security.AppPermission.Type.*;
/**
* Implementation of the cluster service.
*/
......@@ -61,7 +68,7 @@ public class ClusterManager
private ClusterStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterStore store;
......@@ -73,7 +80,8 @@ public class ClusterManager
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
clusterDefinitionService.seedNodes()
clusterMetadataService.getClusterMetadata()
.getNodes()
.forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
log.info("Started");
}
......@@ -119,11 +127,16 @@ public class ClusterManager
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
public void formCluster(Set<ControllerNode> nodes) {
checkNotNull(nodes, "Nodes cannot be null");
checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
checkNotNull(ipPrefix, "IP prefix cannot be null");
clusterDefinitionService.formCluster(nodes, ipPrefix);
ClusterMetadata metadata = ClusterMetadata.builder()
.withName("default")
.withControllerNodes(nodes)
.withPartitions(buildDefaultPartitions(nodes))
.build();
clusterMetadataService.setClusterMetadata(metadata);
try {
log.warn("Shutting down container for cluster reconfiguration!");
systemService.reboot("now", SystemService.Swipe.NONE);
......@@ -153,4 +166,21 @@ public class ClusterManager
post(event);
}
}
private static Collection<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes) {
List<ControllerNode> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
Collection<Partition> partitions = Lists.newArrayList();
int length = nodes.size();
int count = 3;
for (int i = 0; i < length; i++) {
Set<NodeId> set = new HashSet<>(count);
for (int j = 0; j < count; j++) {
set.add(sorted.get((i + j) % length).id());
}
partitions.add(new Partition("p" + (i + 1), set));
}
return partitions;
}
}
......
package org.onosproject.cluster.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Collection;
import java.util.Enumeration;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterMetadataStore;
import org.onosproject.cluster.ClusterMetadataStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
/**
* Implementation of ClusterMetadataService.
*/
@Component(immediate = true)
@Service
public class ClusterMetadataManager
extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener>
implements ClusterMetadataService {
private ControllerNode localNode;
private final Logger log = getLogger(getClass());
private ClusterMetadataStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataStore store;
@Activate
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterMetadataEvent.class, listenerRegistry);
establishSelfIdentity();
log.info("Started");
}
@Deactivate
public void deactivate() {
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterMetadataEvent.class);
log.info("Stopped");
}
@Override
public ClusterMetadata getClusterMetadata() {
return Versioned.valueOrElse(store.getClusterMetadata(), null);
}
@Override
public ControllerNode getLocalNode() {
return localNode;
}
@Override
public void setClusterMetadata(ClusterMetadata metadata) {
checkNotNull(metadata, "Cluster metadata cannot be null");
store.setClusterMetadata(metadata);
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements ClusterMetadataStoreDelegate {
@Override
public void notify(ClusterMetadataEvent event) {
post(event);
}
}
private IpAddress findLocalIp(Collection<ControllerNode> controllerNodes) throws SocketException {
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (controllerNodes.stream()
.map(ControllerNode::ip)
.anyMatch(nodeIp -> ip.equals(nodeIp))) {
return ip;
}
}
}
throw new IllegalStateException("Unable to determine local ip");
}
private void establishSelfIdentity() {
try {
IpAddress ip = findLocalIp(getClusterMetadata().getNodes());
localNode = getClusterMetadata().getNodes()
.stream()
.filter(node -> node.ip().equals(ip))
.findFirst()
.get();
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
}
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.consistent.impl.DatabaseDefinition;
import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Set;
import java.util.stream.Collectors;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of ClusterDefinitionService.
*/
@Component(immediate = true)
@Service
public class ClusterDefinitionManager implements ClusterDefinitionService {
public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
private static final String ONOS_NIC = "ONOS_NIC";
private static final Logger log = getLogger(ClusterDefinitionManager.class);
private ControllerNode localNode;
private Set<ControllerNode> seedNodes;
@Activate
public void activate() {
File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
if (!clusterDefinitionFile.exists()) {
createDefaultClusterDefinition(clusterDefinitionStore);
}
try {
ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
establishSelfIdentity(clusterDefinition);
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
.filter(n -> !localNode.id().equals(new NodeId(n.getId())))
.map(n -> new DefaultControllerNode(new NodeId(n.getId()),
IpAddress.valueOf(n.getIp()),
n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
throw new IllegalStateException("Failed to read cluster definition.", e);
}
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public ControllerNode localNode() {
return localNode;
}
@Override
public Set<ControllerNode> seedNodes() {
return seedNodes;
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
try {
Set<NodeInfo> infos = Sets.newHashSet();
nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
n.ip().toString(),
n.tcpPort())));
ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
DatabaseDefinition ddef = DatabaseDefinition.from(infos);
new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
} catch (IOException e) {
log.error("Unable to form cluster", e);
}
}
private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (clusterDefinition.getNodes().stream()
.map(NodeInfo::getIp)
.map(IpAddress::valueOf)
.anyMatch(nodeIp -> ip.equals(nodeIp))) {
return ip;
}
}
}
throw new IllegalStateException("Unable to determine local ip");
}
private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
try {
IpAddress ip = findLocalIp(clusterDefinition);
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
}
}
private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
try {
store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
}
/**
* Returns the address that matches the IP prefix given in ONOS_NIC
* environment variable if one was specified, or the first site local
* address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
IpAddress ip = IpAddress.valueOf(address);
if (ipPrefix == null && address.isSiteLocalAddress() ||
ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
return ip.toString();
}
}
}
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
// Indicates whether the specified interface address matches the given prefix.
// FIXME: Add a facility to IpPrefix to make this more robust
private static boolean matchInterface(String ip, String ipPrefix) {
String s = ipPrefix.replaceAll("\\.\\*", "");
return ip.startsWith(s);
}
}
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
public class ClusterDefinitionStore {
private final File file;
/**
* Creates a reader/writer of the cluster definition file.
* @param filePath location of the definition file
*/
public ClusterDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Returns the cluster definition.
* @return cluster definition
* @throws IOException when I/O exception of some sort has occurred
*/
public ClusterDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(file, ClusterDefinition.class);
}
/**
* Writes the specified cluster definition to file.
* @param definition cluster definition
* @throws IOException when I/O exception of some sort has occurred
*/
public void write(ClusterDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
Files.createParentDirs(file);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
......@@ -27,8 +27,8 @@ import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
......@@ -99,14 +99,14 @@ public class DistributedClusterStore
private ControllerNode localNode;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
@Activate
public void activate() {
localNode = clusterDefinitionService.localNode();
localNode = clusterMetadataService.getLocalNode();
messagingService.registerHandler(HEARTBEAT_MESSAGE,
new HeartbeatMessageHandler(), heartBeatMessageHandler);
......@@ -116,9 +116,6 @@ public class DistributedClusterStore
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
addNode(localNode);
updateState(localNode.id(), State.ACTIVE);
log.info("Started");
}
......@@ -188,7 +185,7 @@ public class DistributedClusterStore
private void addNode(ControllerNode node) {
allNodes.put(node.id(), node);
updateState(node.id(), State.INACTIVE);
updateState(node.id(), node.equals(localNode) ? State.ACTIVE : State.INACTIVE);
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
}
......
package org.onosproject.store.cluster.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataStore;
import org.onosproject.cluster.ClusterMetadataStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
/**
* ClusterMetadataStore backed by a local file.
*/
@Component(immediate = true, enabled = true)
@Service
public class StaticClusterMetadataStore
extends AbstractStore<ClusterMetadataEvent, ClusterMetadataStoreDelegate>
implements ClusterMetadataStore {
private final Logger log = getLogger(getClass());
private static final String CLUSTER_METADATA_FILE = "../config/cluster.json";
private static final int DEFAULT_ONOS_PORT = 9876;
private final File metadataFile = new File(CLUSTER_METADATA_FILE);
private AtomicReference<ClusterMetadata> metadata = new AtomicReference<>();
private ObjectMapper mapper;
private long version;
@Activate
public void activate() {
mapper = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.addSerializer(NodeId.class, new NodeIdSerializer());
module.addDeserializer(NodeId.class, new NodeIdDeserializer());
module.addSerializer(ControllerNode.class, new ControllerNodeSerializer());
module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer());
mapper.registerModule(module);
File metadataFile = new File(CLUSTER_METADATA_FILE);
if (metadataFile.exists()) {
try {
metadata.set(mapper.readValue(metadataFile, ClusterMetadata.class));
version = metadataFile.lastModified();
} catch (IOException e) {
Throwables.propagate(e);
}
} else {
String localIp = getSiteLocalAddress();
ControllerNode localNode =
new DefaultControllerNode(new NodeId(localIp), IpAddress.valueOf(localIp), DEFAULT_ONOS_PORT);
metadata.set(ClusterMetadata.builder()
.withName("default")
.withControllerNodes(Arrays.asList(localNode))
.withPartitions(Lists.newArrayList(new Partition("p1", Lists.newArrayList(localNode.id()))))
.build());
version = System.currentTimeMillis();
}
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void setDelegate(ClusterMetadataStoreDelegate delegate) {
checkNotNull(delegate, "Delegate cannot be null");
this.delegate = delegate;
}
@Override
public void unsetDelegate(ClusterMetadataStoreDelegate delegate) {
this.delegate = null;
}
@Override
public boolean hasDelegate() {
return this.delegate != null;
}
@Override
public Versioned<ClusterMetadata> getClusterMetadata() {
return new Versioned<>(metadata.get(), version);
}
@Override
public void setClusterMetadata(ClusterMetadata metadata) {
checkNotNull(metadata);
try {
Files.createParentDirs(metadataFile);
mapper.writeValue(metadataFile, metadata);
this.metadata.set(metadata);
} catch (IOException e) {
Throwables.propagate(e);
}
}
@Override
public void setActiveReplica(String partitionId, NodeId nodeId) {
throw new UnsupportedOperationException();
}
@Override
public void unsetActiveReplica(String partitionId, NodeId nodeId) {
throw new UnsupportedOperationException();
}
@Override
public Collection<NodeId> getActiveReplicas(String partitionId) {
return metadata.get().getPartitions()
.stream()
.filter(r -> r.getName().equals(partitionId))
.findFirst()
.map(r -> r.getMembers())
.orElse(null);
}
private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> {
@Override
public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException {
jgen.writeStartObject();
jgen.writeStringField("id", node.id().toString());
jgen.writeStringField("ip", node.ip().toString());
jgen.writeNumberField("port", node.tcpPort());
jgen.writeEndObject();
}
}
private static class ControllerNodeDeserializer extends JsonDeserializer<ControllerNode> {
@Override
public ControllerNode deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException {
JsonNode node = jp.getCodec().readTree(jp);
NodeId nodeId = new NodeId(node.get("id").textValue());
IpAddress ip = IpAddress.valueOf(node.get("ip").textValue());
int port = node.get("port").asInt();
return new DefaultControllerNode(nodeId, ip, port);
}
}
private static class NodeIdSerializer extends JsonSerializer<NodeId> {
@Override
public void serialize(NodeId nodeId, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException {
jgen.writeString(nodeId.toString());
}
}
private class NodeIdDeserializer extends JsonDeserializer<NodeId> {
@Override
public NodeId deserialize(JsonParser jp, DeserializationContext ctxt)
throws IOException, JsonProcessingException {
JsonNode node = jp.getCodec().readTree(jp);
return new NodeId(node.asText());
}
}
private static String getSiteLocalAddress() {
Function<NetworkInterface, IpAddress> ipLookup = nif -> {
for (InetAddress address : Collections.list(nif.getInetAddresses())) {
if (address.isSiteLocalAddress()) {
return IpAddress.valueOf(address);
}
}
return null;
};
try {
IpAddress ip = ipLookup.apply(NetworkInterface.getByName("eth0"));
if (ip != null) {
return ip.toString();
}
for (NetworkInterface nif : Collections.list(getNetworkInterfaces())) {
ip = ipLookup.apply(nif);
if (ip != null) {
return ip.toString();
}
}
} catch (Exception e) {
throw new IllegalStateException("Unable to get network interfaces", e);
}
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
}
\ No newline at end of file
......@@ -22,7 +22,7 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.nio.service.IOLoopMessaging;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
......@@ -38,11 +38,11 @@ public class IOLoopMessagingManager extends IOLoopMessaging {
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
protected ClusterMetadataService clusterMetadataService;
@Activate
public void activate() throws Exception {
ControllerNode localNode = clusterDefinitionService.localNode();
ControllerNode localNode = clusterMetadataService.getLocalNode();
super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
log.info("Started");
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.cluster.messaging.impl;
import com.google.common.base.Strings;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -23,7 +24,7 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.NettyMessaging;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
......@@ -41,11 +42,11 @@ public class NettyMessagingManager extends NettyMessaging {
private static final short MIN_KS_LENGTH = 6;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
protected ClusterMetadataService clusterMetadataService;
@Activate
public void activate() throws Exception {
ControllerNode localNode = clusterDefinitionService.localNode();
ControllerNode localNode = clusterMetadataService.getLocalNode();
getTLSParameters();
super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
log.info("Started");
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.onosproject.store.cluster.impl.NodeInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Partitioned database configuration.
*/
public class DatabaseDefinition {
private Map<String, Set<NodeInfo>> partitions;
private Set<NodeInfo> nodes;
/**
* Creates a new DatabaseDefinition.
*
* @param partitions partition map
* @param nodes set of nodes
* @return database definition
*/
public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions,
Set<NodeInfo> nodes) {
checkNotNull(partitions);
checkNotNull(nodes);
DatabaseDefinition definition = new DatabaseDefinition();
definition.partitions = ImmutableMap.copyOf(partitions);
definition.nodes = ImmutableSet.copyOf(nodes);
return definition;
}
/**
* Creates a new DatabaseDefinition using default partitions.
*
* @param nodes set of nodes
* @return database definition
*/
public static DatabaseDefinition from(Set<NodeInfo> nodes) {
return from(generateDefaultPartitions(nodes), nodes);
}
/**
* Returns the map of database partitions.
*
* @return db partition map
*/
public Map<String, Set<NodeInfo>> getPartitions() {
return partitions;
}
/**
* Returns the set of nodes.
*
* @return nodes
*/
public Set<NodeInfo> getNodes() {
return nodes;
}
/**
* Generates set of default partitions using permutations of the nodes.
*
* @param nodes information about cluster nodes
* @return default partition map
*/
private static Map<String, Set<NodeInfo>> generateDefaultPartitions(Set<NodeInfo> nodes) {
List<NodeInfo> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.getId().compareTo(o2.getId()));
Map<String, Set<NodeInfo>> partitions = Maps.newHashMap();
int length = nodes.size();
int count = 3;
for (int i = 0; i < length; i++) {
Set<NodeInfo> set = new HashSet<>(count);
for (int j = 0; j < count; j++) {
set.add(sorted.get((i + j) % length));
}
partitions.put("p" + (i + 1), set);
}
return partitions;
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.File;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Files;
/**
* Allows for reading and writing partitioned database definition as a JSON file.
*/
public class DatabaseDefinitionStore {
private final File file;
/**
* Creates a reader/writer of the database definition file.
*
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(String filePath) {
file = new File(checkNotNull(filePath));
}
/**
* Creates a reader/writer of the database definition file.
*
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(File filePath) {
file = checkNotNull(filePath);
}
/**
* Returns the database definition.
*
* @return database definition
* @throws IOException when I/O exception of some sort has occurred.
*/
public DatabaseDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(file, DatabaseDefinition.class);
}
/**
* Writes the specified database definition to file.
*
* @param definition database definition
* @throws IOException when I/O exception of some sort has occurred.
*/
public void write(DatabaseDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
Files.createParentDirs(file);
ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
......@@ -18,7 +18,6 @@ package org.onosproject.store.consistent.impl;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
......@@ -50,12 +49,12 @@ import org.apache.felix.scr.annotations.Service;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationListener;
import org.onosproject.app.ApplicationService;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
......@@ -73,8 +72,6 @@ import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContextBuilder;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
......@@ -99,8 +96,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
public static final int COPYCAT_TCP_PORT = 9876;
public static final String PARTITION_DEFINITION_FILE = "../config/tablets.json";
public static final String BASE_PARTITION_NAME = "p0";
private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
......@@ -122,6 +117,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataService clusterMetadataService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
......@@ -130,8 +128,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
protected String nodeToUri(NodeInfo node) {
return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
protected String nodeIdToUri(NodeId nodeId) {
ControllerNode node = clusterService.getNode(nodeId);
return String.format("onos://%s:%d", node.ip(), node.tcpPort());
}
protected void bindApplicationService(ApplicationService service) {
......@@ -147,30 +146,22 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
// load database configuration
File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
Map<String, Set<NodeInfo>> partitionMap;
try {
DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(databaseDefFile);
if (!databaseDefFile.exists()) {
createDefaultDatabaseDefinition(databaseDefStore);
}
partitionMap = databaseDefStore.read().getPartitions();
} catch (IOException e) {
throw new IllegalStateException("Failed to load database config", e);
}
Map<String, Set<NodeId>> partitionMap = Maps.newHashMap();
clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> {
partitionMap.put(p.getName(), Sets.newHashSet(p.getMembers()));
});
String[] activeNodeUris = partitionMap.values()
.stream()
.reduce((s1, s2) -> Sets.union(s1, s2))
.get()
.stream()
.map(this::nodeToUri)
.map(this::nodeIdToUri)
.toArray(String[]::new);
String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
String localNodeUri = nodeIdToUri(clusterMetadataService.getLocalNode().id());
Protocol protocol = new CopycatCommunicationProtocol(clusterService, clusterCommunicator);
ClusterConfig clusterConfig = new ClusterConfig()
......@@ -198,7 +189,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
List<Database> partitions = partitionMap.entrySet()
.stream()
.map(entry -> {
String[] replicas = entry.getValue().stream().map(this::nodeToUri).toArray(String[]::new);
String[] replicas = entry.getValue().stream().map(this::nodeIdToUri).toArray(String[]::new);
return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
})
.map(config -> {
......@@ -229,17 +220,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
log.info("Started");
}
private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = ClusterDefinitionManager.getSiteLocalAddress();
NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
try {
store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
}
@Deactivate
public void deactivate() {
CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
......
......@@ -37,17 +37,6 @@ export ONOS_BOOT_FEATURES="${ONOS_BOOT_FEATURES:-webconsole,onos-api,onos-core,o
# ONOS builtin apps and providers ignited by default
export ONOS_APPS="${ONOS_APPS:-drivers,openflow}"
# Generate a cluster.json from the ON* environment variables
CDEF_FILE=/tmp/${remote}.cluster.json
echo "{ \"ipPrefix\": \"$ONOS_NIC\"," > $CDEF_FILE
echo " \"nodes\":[" >> $CDEF_FILE
for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE
done
echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $CDEF_FILE
echo "]}" >> $CDEF_FILE
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/cluster.json
ssh $remote "
echo \"onos.ip = \$(sudo ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
......@@ -66,10 +55,10 @@ ssh $remote "
done
"
# Generate a default tablets.json from the ON* environment variables
TDEF_FILE=/tmp/${remote}.tablets.json
onos-gen-partitions $TDEF_FILE
scp -q $TDEF_FILE $remote:$ONOS_INSTALL_DIR/config/tablets.json
# Generate a default cluster.json from the ON* environment variables
CDEF_FILE=/tmp/${remote}.cluster.json
onos-gen-partitions $CDEF_FILE
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/cluster.json
# Copy tools/package/config/ to remote
scp -qr ${ONOS_ROOT}/tools/package/config/ $remote:$ONOS_INSTALL_DIR/
......
......@@ -23,22 +23,27 @@ def get_OC_vars():
return sorted(vars, key=alphanum_key)
def get_nodes(vars, port=9876):
node = lambda k: { 'id': k, 'ip': k, 'tcpPort': port }
node = lambda k: { 'id': k, 'ip': k, 'port': port }
return [ node(environ[v]) for v in vars ]
def generate_permutations(nodes, k):
l = deque(nodes)
perms = {}
perms = []
for i in range(1, len(nodes)+1):
perms['p%d' % i] = list(l)[:k]
part = {
'name': 'p%d' % i,
'members': list(l)[:k]
}
perms.append(part)
l.rotate(-1)
return OrderedDict(sorted(perms.iteritems(), key=lambda (k, v): alphanum_key(k)))
return perms
if __name__ == '__main__':
vars = get_OC_vars()
nodes = get_nodes(vars)
partitions = generate_permutations(nodes, 3)
partitions = generate_permutations([v.get('id') for v in nodes], 3)
data = {
'name': 'default',
'nodes': nodes,
'partitions': partitions
}
......
......@@ -84,10 +84,9 @@ public class ClusterWebResource extends AbstractWebResource {
public Response formCluster(InputStream config) throws IOException {
JsonCodec<ControllerNode> codec = codec(ControllerNode.class);
ObjectNode root = (ObjectNode) mapper().readTree(config);
String ipPrefix = root.path("ipPrefix").asText();
List<ControllerNode> nodes = codec.decode((ArrayNode) root.path("nodes"), this);
get(ClusterAdminService.class).formCluster(new HashSet<>(nodes), ipPrefix);
get(ClusterAdminService.class).formCluster(new HashSet<>(nodes));
return Response.ok().build();
}
......