Madan Jampani
Committed by Gerrit Code Review

Using provider pattern for cluster metadata.

Change-Id: I5a572b3df9149be959dde9868a9c594dec26a3e0
......@@ -20,10 +20,10 @@ import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.onosproject.net.Provided;
import org.onosproject.net.provider.ProviderId;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verifyNotNull;
import static com.google.common.base.Verify.verify;
import com.google.common.base.MoreObjects;
......@@ -38,21 +38,49 @@ import com.google.common.collect.Sets;
* of {@link org.onosproject.cluster.ControllerNode nodes} and the collection of data
* {@link org.onosproject.cluster.Partition partitions}.
*/
public final class ClusterMetadata {
public final class ClusterMetadata implements Provided {
// Name to use when the ClusterMetadataService is in transient state
public static final String NO_NAME = "";
private String name;
private Set<ControllerNode> nodes;
private Set<Partition> partitions;
private final ProviderId providerId;
private final String name;
private final Set<ControllerNode> nodes;
private final Set<Partition> partitions;
private ClusterMetadata() {
providerId = null;
name = null;
nodes = null;
partitions = null;
}
public ClusterMetadata(ProviderId providerId,
String name,
Set<ControllerNode> nodes,
Set<Partition> partitions) {
this.providerId = checkNotNull(providerId);
this.name = checkNotNull(name);
this.nodes = ImmutableSet.copyOf(checkNotNull(nodes));
// verify that partitions are constituted from valid cluster nodes.
boolean validPartitions = Collections2.transform(nodes, ControllerNode::id)
.containsAll(partitions
.stream()
.flatMap(r -> r.getMembers().stream())
.collect(Collectors.toSet()));
verify(validPartitions, "Partition locations must be valid cluster nodes");
this.partitions = ImmutableSet.copyOf(checkNotNull(partitions));
}
/**
* Returns a new cluster metadata builder.
* @return The cluster metadata builder.
*/
public static Builder builder() {
return new Builder();
public ClusterMetadata(String name,
Set<ControllerNode> nodes,
Set<Partition> partitions) {
this(new ProviderId("none", "none"), name, nodes, partitions);
}
@Override
public ProviderId providerId() {
return providerId;
}
/**
......@@ -84,6 +112,7 @@ public final class ClusterMetadata {
@Override
public String toString() {
return MoreObjects.toStringHelper(ClusterMetadata.class)
.add("providerId", providerId)
.add("name", name)
.add("nodes", nodes)
.add("partitions", partitions)
......@@ -92,7 +121,7 @@ public final class ClusterMetadata {
@Override
public int hashCode() {
return Arrays.deepHashCode(new Object[] {name, nodes, partitions});
return Arrays.deepHashCode(new Object[] {providerId, name, nodes, partitions});
}
/*
......@@ -116,74 +145,4 @@ public final class ClusterMetadata {
return Sets.symmetricDifference(this.nodes, that.nodes).isEmpty()
&& Sets.symmetricDifference(this.partitions, that.partitions).isEmpty();
}
/**
* Builder for a {@link ClusterMetadata} instance.
*/
public static class Builder {
private final ClusterMetadata metadata;
public Builder() {
metadata = new ClusterMetadata();
}
/**
* Sets the cluster name, returning the cluster metadata builder for method chaining.
* @param name cluster name
* @return this cluster metadata builder
*/
public Builder withName(String name) {
metadata.name = checkNotNull(name);
return this;
}
/**
* Sets the collection of cluster nodes, returning the cluster metadata builder for method chaining.
* @param controllerNodes collection of cluster nodes
* @return this cluster metadata builder
*/
public Builder withControllerNodes(Collection<ControllerNode> controllerNodes) {
metadata.nodes = ImmutableSet.copyOf(checkNotNull(controllerNodes));
return this;
}
/**
* Sets the partitions, returning the cluster metadata builder for method chaining.
* @param partitions collection of partitions
* @return this cluster metadata builder
*/
public Builder withPartitions(Collection<Partition> partitions) {
metadata.partitions = ImmutableSet.copyOf(checkNotNull(partitions));
return this;
}
/**
* Builds the cluster metadata.
* @return cluster metadata
* @throws com.google.common.base.VerifyException VerifyException if the metadata is misconfigured
*/
public ClusterMetadata build() {
verifyMetadata();
return metadata;
}
/**
* Validates the constructed metadata for semantic correctness.
* @throws VerifyException if the metadata is misconfigured.
*/
private void verifyMetadata() {
verifyNotNull(metadata.getName(), "Cluster name must be specified");
verify(CollectionUtils.isNotEmpty(metadata.getNodes()), "Cluster nodes must be specified");
verify(CollectionUtils.isNotEmpty(metadata.getPartitions()), "Cluster partitions must be specified");
// verify that partitions are constituted from valid cluster nodes.
boolean validPartitions = Collections2.transform(metadata.getNodes(), ControllerNode::id)
.containsAll(metadata.getPartitions()
.stream()
.flatMap(r -> r.getMembers().stream())
.collect(Collectors.toSet()));
verify(validPartitions, "Partition locations must be valid cluster nodes");
}
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -15,27 +15,31 @@
*/
package org.onosproject.cluster;
import java.util.Collection;
import java.util.Set;
import org.onosproject.store.Store;
import org.onosproject.net.provider.Provider;
import org.onosproject.store.service.Versioned;
/**
* Manages persistence of {@link ClusterMetadata cluster metadata}; not intended for direct use.
* Abstraction of a {@link ClusterMetadata cluster metadata} provider.
*/
public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, ClusterMetadataStoreDelegate> {
public interface ClusterMetadataProvider extends Provider {
/**
* Returns the cluster metadata.
* <p>
* The retuned metadata is encapsulated as a {@link Versioned versioned} and therefore has a specific version.
* Tells if this provider is currently available and therefore can provide ClusterMetadata.
* @return {@code true} if this provider is available and can provide cluster metadata.
*/
boolean isAvailable();
/**
* Returns the current cluster metadata.
* @return cluster metadata
*/
Versioned<ClusterMetadata> getClusterMetadata();
/**
* Updates the cluster metadata.
* @param metadata new metadata value
* Updates cluster metadata.
* @param metadata new metadata
*/
void setClusterMetadata(ClusterMetadata metadata);
......@@ -54,12 +58,12 @@ public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, Cluste
/**
* Removes a controller node from the list of active members for a partition.
* @param partitionId partition identifier
* @param nodeId id of controller node
* @param nodeId identifier of controller node
*/
void removeActivePartitionMember(PartitionId partitionId, NodeId nodeId);
/**
* Returns the collection of controller nodes that are the active members for a partition.
* Returns the set of controller nodes that are the active members for a partition.
* <p>
* Active members of a partition are typically those that are actively
* participating in the data replication protocol being employed. When
......@@ -72,5 +76,5 @@ public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, Cluste
* @param partitionId partition identifier
* @return identifiers of controller nodes that are active members
*/
Collection<NodeId> getActivePartitionMembers(PartitionId partitionId);
Set<NodeId> getActivePartitionMembers(PartitionId partitionId);
}
......
/*
* Copyright 2015 Open Networking Laboratory
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -15,10 +15,11 @@
*/
package org.onosproject.cluster;
import org.onosproject.store.StoreDelegate;
import org.onosproject.net.provider.ProviderRegistry;
/**
* Cluster metadata store delegate abstraction.
* Abstraction of a cluster metadata provider registry.
*/
public interface ClusterMetadataStoreDelegate extends StoreDelegate<ClusterMetadataEvent> {
public interface ClusterMetadataProviderRegistry
extends ProviderRegistry<ClusterMetadataProvider, ClusterMetadataProviderService> {
}
......
/*
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import org.onosproject.net.provider.ProviderService;
import org.onosproject.store.service.Versioned;
/**
* Service through which a {@link ClusterMetadataProvider provider} can notify core of
* updates made to cluster metadata.
*/
public interface ClusterMetadataProviderService extends ProviderService<ClusterMetadataProvider> {
/**
* Notifies about a change to cluster metadata.
* @param newMetadata new cluster metadata value
*/
void clusterMetadataChanged(Versioned<ClusterMetadata> newMetadata);
/**
* Notifies that a node just become the active member of a partition.
* @param partitionId partition identifier
* @param nodeId identifier of node
*/
void newActiveMemberForPartition(PartitionId partitionId, NodeId nodeId);
}
......@@ -108,4 +108,14 @@ public abstract class AbstractProviderRegistry<P extends Provider, S extends Pro
return providersByScheme.get(deviceId.uri().getScheme());
}
/**
* Returns the provider registered with the specified scheme.
*
* @param scheme provider scheme
* @return provider
*/
protected synchronized P getProvider(String scheme) {
return providersByScheme.get(scheme);
}
}
......
......@@ -42,7 +42,6 @@ import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
......@@ -139,11 +138,7 @@ public class ClusterManager
checkNotNull(nodes, "Nodes cannot be null");
checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
ClusterMetadata metadata = ClusterMetadata.builder()
.withName("default")
.withControllerNodes(nodes)
.withPartitions(buildDefaultPartitions(nodes))
.build();
ClusterMetadata metadata = new ClusterMetadata("default", nodes, buildDefaultPartitions(nodes));
clusterMetadataAdminService.setClusterMetadata(metadata);
try {
log.warn("Shutting down container for cluster reconfiguration!");
......@@ -175,10 +170,10 @@ public class ClusterManager
}
}
private static Collection<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes) {
private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes) {
List<ControllerNode> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
Collection<Partition> partitions = Lists.newArrayList();
Set<Partition> partitions = Sets.newHashSet();
// add p0 partition
partitions.add(new DefaultPartition(PartitionId.from(0),
Sets.newHashSet(Collections2.transform(nodes, ControllerNode::id))));
......
......@@ -21,81 +21,117 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Enumeration;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataAdminService;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataProvider;
import org.onosproject.cluster.ClusterMetadataProviderRegistry;
import org.onosproject.cluster.ClusterMetadataProviderService;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterMetadataStore;
import org.onosproject.cluster.ClusterMetadataStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.net.provider.AbstractListenerProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
/**
* Implementation of ClusterMetadataService.
*/
@Component(immediate = true)
@Service
public class ClusterMetadataManager
extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener>
implements ClusterMetadataService, ClusterMetadataAdminService {
extends AbstractListenerProviderRegistry<ClusterMetadataEvent,
ClusterMetadataEventListener,
ClusterMetadataProvider,
ClusterMetadataProviderService>
implements ClusterMetadataService, ClusterMetadataAdminService, ClusterMetadataProviderRegistry {
private final Logger log = getLogger(getClass());
private ControllerNode localNode;
private ClusterMetadataStoreDelegate delegate = new InternalStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataStore store;
@Activate
public void activate() {
store.setDelegate(delegate);
// FIXME: Need to ensure all cluster metadata providers are registered before we activate
eventDispatcher.addSink(ClusterMetadataEvent.class, listenerRegistry);
establishSelfIdentity();
log.info("Started");
}
@Deactivate
public void deactivate() {
store.unsetDelegate(delegate);
eventDispatcher.removeSink(ClusterMetadataEvent.class);
log.info("Stopped");
}
@Override
public ClusterMetadata getClusterMetadata() {
return Versioned.valueOrElse(store.getClusterMetadata(), null);
Versioned<ClusterMetadata> metadata = getProvider().getClusterMetadata();
return metadata.value();
}
@Override
protected ClusterMetadataProviderService createProviderService(
ClusterMetadataProvider provider) {
return new InternalClusterMetadataProviderService(provider);
}
@Override
public ControllerNode getLocalNode() {
if (localNode == null) {
establishSelfIdentity();
}
return localNode;
}
@Override
public void setClusterMetadata(ClusterMetadata metadata) {
checkNotNull(metadata, "Cluster metadata cannot be null");
store.setClusterMetadata(metadata);
ClusterMetadataProvider primaryProvider = getPrimaryProvider();
if (primaryProvider == null) {
throw new IllegalStateException("Missing primary provider. Cannot update cluster metadata");
}
primaryProvider.setClusterMetadata(metadata);
}
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements ClusterMetadataStoreDelegate {
@Override
public void notify(ClusterMetadataEvent event) {
post(event);
/**
* Returns the provider to use for fetching cluster metadata.
* @return cluster metadata provider
*/
private ClusterMetadataProvider getProvider() {
ClusterMetadataProvider primaryProvider = getPrimaryProvider();
if (primaryProvider != null && primaryProvider.isAvailable()) {
return primaryProvider;
}
log.warn("Primary cluster metadata provider not available. Using default fallback.");
return getProvider("default");
}
/**
* Returns the primary provider for cluster metadata.
* @return primary cluster metadata provider
*/
private ClusterMetadataProvider getPrimaryProvider() {
try {
URI uri = new URI(System.getProperty("onos.cluster.metadata.uri", "config:///cluster.json"));
return getProvider(uri.getScheme());
} catch (URISyntaxException e) {
Throwables.propagate(e);
return null;
}
}
......@@ -129,4 +165,25 @@ public class ClusterMetadataManager
throw new IllegalStateException("Cannot determine local IP", e);
}
}
private class InternalClusterMetadataProviderService
extends AbstractProviderService<ClusterMetadataProvider>
implements ClusterMetadataProviderService {
InternalClusterMetadataProviderService(ClusterMetadataProvider provider) {
super(provider);
}
@Override
public void clusterMetadataChanged(Versioned<ClusterMetadata> newMetadata) {
log.info("Cluster metadata changed. New metadata: {}", newMetadata);
post(new ClusterMetadataEvent(ClusterMetadataEvent.Type.METADATA_CHANGED, newMetadata.value()));
}
@Override
public void newActiveMemberForPartition(PartitionId partitionId, NodeId nodeId) {
log.info("Node {} is active member for partition {}", nodeId, partitionId);
// TODO: notify listeners
}
}
}
\ No newline at end of file
......
/*
* Copyright 2015-2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster.impl;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataProvider;
import org.onosproject.cluster.ClusterMetadataProviderRegistry;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.DefaultPartition;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/**
* Provider of default {@link ClusterMetadata cluster metadata}.
*/
@Component(immediate = true)
public class DefaultClusterMetadataProvider implements ClusterMetadataProvider {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataProviderRegistry providerRegistry;
private static final String ONOS_IP = "ONOS_IP";
private static final String ONOS_INTERFACE = "ONOS_INTERFACE";
private static final String ONOS_ALLOW_IPV6 = "ONOS_ALLOW_IPV6";
private static final String DEFAULT_ONOS_INTERFACE = "eth0";
private static final int DEFAULT_ONOS_PORT = 9876;
private static final ProviderId PROVIDER_ID = new ProviderId("default", "none");
private AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
@Activate
public void activate() {
String localIp = getSiteLocalAddress();
ControllerNode localNode =
new DefaultControllerNode(new NodeId(localIp), IpAddress.valueOf(localIp), DEFAULT_ONOS_PORT);
// p0 partition
Partition basePartition = new DefaultPartition(PartitionId.from(0), Sets.newHashSet(localNode.id()));
// p1 partition
Partition extendedPartition = new DefaultPartition(PartitionId.from(1), Sets.newHashSet(localNode.id()));
ClusterMetadata metadata = new ClusterMetadata(PROVIDER_ID,
"default",
ImmutableSet.of(localNode),
ImmutableSet.of(basePartition, extendedPartition));
long version = System.currentTimeMillis();
cachedMetadata.set(new Versioned<>(metadata, version));
providerRegistry.register(this);
log.info("Started");
}
@Deactivate
public void deactivate() {
providerRegistry.unregister(this);
log.info("Stopped");
}
@Override
public ProviderId id() {
return PROVIDER_ID;
}
@Override
public Versioned<ClusterMetadata> getClusterMetadata() {
return cachedMetadata.get();
}
@Override
public void setClusterMetadata(ClusterMetadata metadata) {
throw new UnsupportedOperationException();
}
@Override
public void addActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
throw new UnsupportedOperationException();
}
@Override
public void removeActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
throw new UnsupportedOperationException();
}
@Override
public Set<NodeId> getActivePartitionMembers(PartitionId partitionId) {
throw new UnsupportedOperationException();
}
@Override
public boolean isAvailable() {
return true;
}
private static String getSiteLocalAddress() {
/*
* If the IP ONOS should use is set via the environment variable we will assume it is valid and should be used.
* Setting the IP address takes presidence over setting the interface via the environment.
*/
String useOnosIp = System.getenv(ONOS_IP);
if (useOnosIp != null) {
return useOnosIp;
}
// Read environment variables for IP interface information or set to default
String useOnosInterface = System.getenv(ONOS_INTERFACE);
if (useOnosInterface == null) {
useOnosInterface = DEFAULT_ONOS_INTERFACE;
}
// Capture if they want to limit IP address selection to only IPv4 (default).
boolean allowIPv6 = (System.getenv(ONOS_ALLOW_IPV6) != null);
Function<NetworkInterface, IpAddress> ipLookup = nif -> {
IpAddress fallback = null;
// nif can be null if the interface name specified doesn't exist on the node's host
if (nif != null) {
for (InetAddress address : Collections.list(nif.getInetAddresses())) {
if (address.isSiteLocalAddress() && (allowIPv6 || address instanceof Inet4Address)) {
return IpAddress.valueOf(address);
}
if (fallback == null && !address.isLoopbackAddress() && !address.isMulticastAddress()
&& (allowIPv6 || address instanceof Inet4Address)) {
fallback = IpAddress.valueOf(address);
}
}
}
return fallback;
};
try {
IpAddress ip = ipLookup.apply(NetworkInterface.getByName(useOnosInterface));
if (ip != null) {
return ip.toString();
}
for (NetworkInterface nif : Collections.list(getNetworkInterfaces())) {
if (!nif.getName().equals(useOnosInterface)) {
ip = ipLookup.apply(nif);
if (ip != null) {
return ip.toString();
}
}
}
} catch (Exception e) {
throw new IllegalStateException("Unable to get network interfaces", e);
}
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
}
\ No newline at end of file
......@@ -954,10 +954,9 @@ public class LldpLinkProviderTest {
final NodeId nid = new NodeId("test-node");
final IpAddress addr = IpAddress.valueOf(0);
final Partition p = new DefaultPartition(PartitionId.from(1), Sets.newHashSet(nid));
return ClusterMetadata.builder()
.withName("test-cluster")
.withControllerNodes(Sets.newHashSet(new DefaultControllerNode(nid, addr)))
.withPartitions(Sets.newHashSet(p)).build();
return new ClusterMetadata("test-cluster",
Sets.newHashSet(new DefaultControllerNode(nid, addr)),
Sets.newHashSet(p));
}
@Override
......