Madan Jampani
Committed by Gerrit Code Review

First round of ClusterMetadata improvements:

    Introduced a PartitionId type for identifying partitions
    Introduced a admin service for making metadata updates
    Update cluster.json format to specify all partitions (including p0) and changed partitionId to be an int.

Change-Id: Ia0617f1ed0ce886680dcee4f5396a4bbdfa225da
...@@ -20,6 +20,8 @@ import java.util.Collection; ...@@ -20,6 +20,8 @@ import java.util.Collection;
20 import java.util.Set; 20 import java.util.Set;
21 import java.util.stream.Collectors; 21 import java.util.stream.Collectors;
22 22
23 +import org.apache.commons.collections.CollectionUtils;
24 +
23 import static com.google.common.base.Preconditions.checkNotNull; 25 import static com.google.common.base.Preconditions.checkNotNull;
24 import static com.google.common.base.Verify.verifyNotNull; 26 import static com.google.common.base.Verify.verifyNotNull;
25 import static com.google.common.base.Verify.verify; 27 import static com.google.common.base.Verify.verify;
...@@ -32,7 +34,7 @@ import com.google.common.collect.Sets; ...@@ -32,7 +34,7 @@ import com.google.common.collect.Sets;
32 /** 34 /**
33 * Cluster metadata. 35 * Cluster metadata.
34 * <p> 36 * <p>
35 - * Metadata specifies the attributes that define a ONOS cluster and comprises the collection 37 + * Metadata specifies how a ONOS cluster is constituted and is made up of the collection
36 * of {@link org.onosproject.cluster.ControllerNode nodes} and the collection of data 38 * of {@link org.onosproject.cluster.ControllerNode nodes} and the collection of data
37 * {@link org.onosproject.cluster.Partition partitions}. 39 * {@link org.onosproject.cluster.Partition partitions}.
38 */ 40 */
...@@ -71,7 +73,8 @@ public final class ClusterMetadata { ...@@ -71,7 +73,8 @@ public final class ClusterMetadata {
71 } 73 }
72 74
73 /** 75 /**
74 - * Returns the collection of data {@link org.onosproject.cluster.Partition partitions} that make up the cluster. 76 + * Returns the collection of {@link org.onosproject.cluster.Partition partitions} that make
77 + * up the cluster.
75 * @return collection of partitions. 78 * @return collection of partitions.
76 */ 79 */
77 public Collection<Partition> getPartitions() { 80 public Collection<Partition> getPartitions() {
...@@ -93,7 +96,7 @@ public final class ClusterMetadata { ...@@ -93,7 +96,7 @@ public final class ClusterMetadata {
93 } 96 }
94 97
95 /* 98 /*
96 - * Provide a deep quality check of the meta data (non-Javadoc) 99 + * Provide a deep equality check of the cluster metadata (non-Javadoc)
97 * 100 *
98 * @see java.lang.Object#equals(java.lang.Object) 101 * @see java.lang.Object#equals(java.lang.Object)
99 */ 102 */
...@@ -146,7 +149,7 @@ public final class ClusterMetadata { ...@@ -146,7 +149,7 @@ public final class ClusterMetadata {
146 } 149 }
147 150
148 /** 151 /**
149 - * Sets the collection of data partitions, returning the cluster metadata builder for method chaining. 152 + * Sets the partitions, returning the cluster metadata builder for method chaining.
150 * @param partitions collection of partitions 153 * @param partitions collection of partitions
151 * @return this cluster metadata builder 154 * @return this cluster metadata builder
152 */ 155 */
...@@ -171,10 +174,8 @@ public final class ClusterMetadata { ...@@ -171,10 +174,8 @@ public final class ClusterMetadata {
171 */ 174 */
172 private void verifyMetadata() { 175 private void verifyMetadata() {
173 verifyNotNull(metadata.getName(), "Cluster name must be specified"); 176 verifyNotNull(metadata.getName(), "Cluster name must be specified");
174 - verifyNotNull(metadata.getNodes(), "Cluster nodes must be specified"); 177 + verify(CollectionUtils.isEmpty(metadata.getNodes()), "Cluster nodes must be specified");
175 - verifyNotNull(metadata.getPartitions(), "Cluster partitions must be specified"); 178 + verify(CollectionUtils.isEmpty(metadata.getPartitions()), "Cluster partitions must be specified");
176 - verify(!metadata.getNodes().isEmpty(), "Cluster nodes must not be empty");
177 - verify(!metadata.getPartitions().isEmpty(), "Cluster nodes must not be empty");
178 179
179 // verify that partitions are constituted from valid cluster nodes. 180 // verify that partitions are constituted from valid cluster nodes.
180 boolean validPartitions = Collections2.transform(metadata.getNodes(), ControllerNode::id) 181 boolean validPartitions = Collections2.transform(metadata.getNodes(), ControllerNode::id)
......
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.cluster;
17 +
18 +/**
19 + * Service for making updates to {@link ClusterMetadata cluster metadata}.
20 + */
21 +public interface ClusterMetadataAdminService {
22 +
23 + /**
24 + * Updates the cluster metadata.
25 + * @param metadata new metadata
26 + */
27 + void setClusterMetadata(ClusterMetadata metadata);
28 +}
...@@ -15,10 +15,13 @@ ...@@ -15,10 +15,13 @@
15 */ 15 */
16 package org.onosproject.cluster; 16 package org.onosproject.cluster;
17 17
18 +import org.onosproject.event.ListenerService;
19 +
18 /** 20 /**
19 - * Service for obtaining metadata information about the cluster. 21 + * Service for accessing {@link ClusterMetadata cluster metadata}.
20 */ 22 */
21 -public interface ClusterMetadataService { 23 +public interface ClusterMetadataService
24 + extends ListenerService<ClusterMetadataEvent, ClusterMetadataEventListener> {
22 25
23 /** 26 /**
24 * Returns the current cluster metadata. 27 * Returns the current cluster metadata.
...@@ -27,13 +30,7 @@ public interface ClusterMetadataService { ...@@ -27,13 +30,7 @@ public interface ClusterMetadataService {
27 ClusterMetadata getClusterMetadata(); 30 ClusterMetadata getClusterMetadata();
28 31
29 /** 32 /**
30 - * Updates the cluster metadata. 33 + * Returns the {@link ControllerNode controller node} representing this instance.
31 - * @param metadata new metadata
32 - */
33 - void setClusterMetadata(ClusterMetadata metadata);
34 -
35 - /**
36 - * Returns the local controller node representing this instance.
37 * @return local controller node 34 * @return local controller node
38 */ 35 */
39 ControllerNode getLocalNode(); 36 ControllerNode getLocalNode();
......
...@@ -21,14 +21,14 @@ import org.onosproject.store.Store; ...@@ -21,14 +21,14 @@ import org.onosproject.store.Store;
21 import org.onosproject.store.service.Versioned; 21 import org.onosproject.store.service.Versioned;
22 22
23 /** 23 /**
24 - * Manages persistence of cluster metadata; not intended for direct use. 24 + * Manages persistence of {@link ClusterMetadata cluster metadata}; not intended for direct use.
25 */ 25 */
26 public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, ClusterMetadataStoreDelegate> { 26 public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, ClusterMetadataStoreDelegate> {
27 27
28 /** 28 /**
29 * Returns the cluster metadata. 29 * Returns the cluster metadata.
30 * <p> 30 * <p>
31 - * The returned metadata is versioned to aid determining if a metadata instance is more recent than another. 31 + * The retuned metadata is encapsulated as a {@link Versioned versioned} and therefore has a specific version.
32 * @return cluster metadata 32 * @return cluster metadata
33 */ 33 */
34 Versioned<ClusterMetadata> getClusterMetadata(); 34 Versioned<ClusterMetadata> getClusterMetadata();
...@@ -39,39 +39,38 @@ public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, Cluste ...@@ -39,39 +39,38 @@ public interface ClusterMetadataStore extends Store<ClusterMetadataEvent, Cluste
39 */ 39 */
40 void setClusterMetadata(ClusterMetadata metadata); 40 void setClusterMetadata(ClusterMetadata metadata);
41 41
42 - // TODO: The below methods should move to a separate store interface that is responsible for
43 - // tracking cluster partition operational state.
44 -
45 /** 42 /**
46 - * Sets a controller node as an active member of a partition. 43 + * Adds a controller node to the list of active members for a partition.
47 * <p> 44 * <p>
48 - * Active members are those replicas that are up to speed with the rest of the system and are 45 + * Active members of a partition are those that are actively participating
49 - * usually capable of participating in the replica state management activities in accordance with 46 + * in the data replication protocol being employed. When a node first added
50 - * the data consistency and replication protocol in use. 47 + * to a partition, it is in a passive or catch up mode where it attempts to
48 + * bring it self up to speed with other active members in the partition.
51 * @param partitionId partition identifier 49 * @param partitionId partition identifier
52 - * @param nodeId id of controller node 50 + * @param nodeId identifier of controller node
53 */ 51 */
54 - void setActiveReplica(String partitionId, NodeId nodeId); 52 + void addActivePartitionMember(PartitionId partitionId, NodeId nodeId);
55 53
56 /** 54 /**
57 - * Removes a controller node as an active member for a partition. 55 + * Removes a controller node from the list of active members for a partition.
58 - * <p>
59 - * Active members are those replicas that are up to speed with the rest of the system and are
60 - * usually capable of participating in the replica state management activities in accordance with
61 - * the data consistency and replication protocol in use.
62 * @param partitionId partition identifier 56 * @param partitionId partition identifier
63 * @param nodeId id of controller node 57 * @param nodeId id of controller node
64 */ 58 */
65 - void unsetActiveReplica(String partitionId, NodeId nodeId); 59 + void removeActivePartitionMember(PartitionId partitionId, NodeId nodeId);
66 60
67 /** 61 /**
68 - * Returns the collection of controller nodes that are the active replicas for a partition. 62 + * Returns the collection of controller nodes that are the active members for a partition.
63 + * <p>
64 + * Active members of a partition are typically those that are actively
65 + * participating in the data replication protocol being employed. When
66 + * a node first added to a partition, it is in a passive or catch up mode where
67 + * it attempts to bring it self up to speed with other active members in the partition.
69 * <p> 68 * <p>
70 - * Active members are those replicas that are up to speed with the rest of the system and are 69 + * <b>Note:</b>If is possible for this list to different from the list of partition members
71 - * usually capable of participating in the replica state management activities in accordance with 70 + * specified by cluster meta data. The discrepancy can arise due to the fact that
72 - * the data consistency and replication protocol in use. 71 + * adding/removing members from a partition requires a data hand-off mechanism to complete.
73 * @param partitionId partition identifier 72 * @param partitionId partition identifier
74 - * @return identifiers of controller nodes that are the active replicas 73 + * @return identifiers of controller nodes that are active members
75 */ 74 */
76 - Collection<NodeId> getActiveReplicas(String partitionId); 75 + Collection<NodeId> getActivePartitionMembers(PartitionId partitionId);
77 } 76 }
...\ No newline at end of file ...\ No newline at end of file
......
1 /* 1 /*
2 - * Copyright 2015 Open Networking Laboratory 2 + * Copyright 2016 Open Networking Laboratory
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
...@@ -15,77 +15,23 @@ ...@@ -15,77 +15,23 @@
15 */ 15 */
16 package org.onosproject.cluster; 16 package org.onosproject.cluster;
17 17
18 -import java.util.Arrays;
19 import java.util.Collection; 18 import java.util.Collection;
20 -import java.util.Set;
21 -
22 -import com.google.common.collect.ImmutableSet;
23 -import com.google.common.collect.Sets;
24 -
25 -import static com.google.common.base.Preconditions.checkNotNull;
26 19
27 /** 20 /**
28 - * A data partition. 21 + * A partition or shard is a group of controller nodes that are work together to maintain state.
29 - * <p> 22 + * A ONOS cluster is typically made of of one or partitions over which the the data is partitioned.
30 - * Partition represents a slice of the data space and is made up of a collection
31 - * of {@link org.onosproject.cluster.ControllerNode nodes}
32 - * that all maintain copies of this data.
33 */ 23 */
34 -public class Partition { 24 +public interface Partition {
35 -
36 - private final String name;
37 - private final Set<NodeId> members;
38 -
39 - private Partition() {
40 - name = null;
41 - members = null;
42 - }
43 -
44 - public Partition(String name, Collection<NodeId> members) {
45 - this.name = checkNotNull(name);
46 - this.members = ImmutableSet.copyOf(checkNotNull(members));
47 - }
48 25
49 /** 26 /**
50 - * Returns the partition name. 27 + * Returns the partition identifier.
51 - * <p> 28 + * @return partition identifier
52 - * Each partition is identified by a unique name.
53 - * @return partition name
54 */ 29 */
55 - public String getName() { 30 + PartitionId getId();
56 - return this.name;
57 - }
58 31
59 /** 32 /**
60 - * Returns the collection of controller node identifiers that make up this partition. 33 + * Returns the controller nodes that are members of this partition.
61 * @return collection of controller node identifiers 34 * @return collection of controller node identifiers
62 */ 35 */
63 - public Collection<NodeId> getMembers() { 36 + Collection<NodeId> getMembers();
64 - return this.members;
65 - }
66 -
67 - @Override
68 - public int hashCode() {
69 - return Arrays.deepHashCode(new Object[] {name, members});
70 - }
71 -
72 - @Override
73 - public boolean equals(Object other) {
74 - if (this == other) {
75 - return true;
76 - }
77 -
78 - if (other == null || !Partition.class.isInstance(other)) {
79 - return false;
80 - }
81 -
82 - Partition that = (Partition) other;
83 -
84 - if (!this.name.equals(that.name) || (this.members == null && that.members != null)
85 - || (this.members != null && that.members == null) || this.members.size() != that.members.size()) {
86 - return false;
87 - }
88 -
89 - return Sets.symmetricDifference(this.members, that.members).isEmpty();
90 - }
91 } 37 }
......
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.cluster;
17 +
18 +import static com.google.common.base.Preconditions.checkArgument;
19 +
20 +import java.util.Objects;
21 +
22 +/**
23 + * {@link Partition} identifier.
24 + */
25 +public class PartitionId implements Comparable<PartitionId> {
26 +
27 + private final int id;
28 +
29 + /**
30 + * Creates a partition identifier from an integer.
31 + *
32 + * @param id input integer
33 + */
34 + public PartitionId(int id) {
35 + checkArgument(id >= 0, "partition id must be non-negative");
36 + this.id = id;
37 + }
38 +
39 + /**
40 + * Creates a partition identifier from an integer.
41 + *
42 + * @param id input integer
43 + */
44 + public static PartitionId from(int id) {
45 + return new PartitionId(id);
46 + }
47 +
48 + /**
49 + * Returns the partition identifier as an integer.
50 + * @return number
51 + */
52 + public int asInt() {
53 + return id;
54 + }
55 +
56 + @Override
57 + public int hashCode() {
58 + return id;
59 + }
60 +
61 + @Override
62 + public boolean equals(Object obj) {
63 + if (this == obj) {
64 + return true;
65 + }
66 + if (obj instanceof PartitionId) {
67 + final PartitionId other = (PartitionId) obj;
68 + return Objects.equals(this.id, other.id);
69 + }
70 + return false;
71 + }
72 +
73 + @Override
74 + public String toString() {
75 + return String.valueOf(id);
76 + }
77 +
78 + @Override
79 + public int compareTo(PartitionId that) {
80 + return Integer.compare(this.id, that.id);
81 + }
82 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -28,6 +28,7 @@ import org.onosproject.cluster.ClusterAdminService; ...@@ -28,6 +28,7 @@ import org.onosproject.cluster.ClusterAdminService;
28 import org.onosproject.cluster.ClusterEvent; 28 import org.onosproject.cluster.ClusterEvent;
29 import org.onosproject.cluster.ClusterEventListener; 29 import org.onosproject.cluster.ClusterEventListener;
30 import org.onosproject.cluster.ClusterMetadata; 30 import org.onosproject.cluster.ClusterMetadata;
31 +import org.onosproject.cluster.ClusterMetadataAdminService;
31 import org.onosproject.cluster.ClusterMetadataService; 32 import org.onosproject.cluster.ClusterMetadataService;
32 import org.onosproject.cluster.ClusterService; 33 import org.onosproject.cluster.ClusterService;
33 import org.onosproject.cluster.ClusterStore; 34 import org.onosproject.cluster.ClusterStore;
...@@ -35,10 +36,13 @@ import org.onosproject.cluster.ClusterStoreDelegate; ...@@ -35,10 +36,13 @@ import org.onosproject.cluster.ClusterStoreDelegate;
35 import org.onosproject.cluster.ControllerNode; 36 import org.onosproject.cluster.ControllerNode;
36 import org.onosproject.cluster.NodeId; 37 import org.onosproject.cluster.NodeId;
37 import org.onosproject.cluster.Partition; 38 import org.onosproject.cluster.Partition;
39 +import org.onosproject.cluster.PartitionId;
38 import org.onosproject.event.AbstractListenerManager; 40 import org.onosproject.event.AbstractListenerManager;
39 import org.slf4j.Logger; 41 import org.slf4j.Logger;
40 42
43 +import com.google.common.collect.Collections2;
41 import com.google.common.collect.Lists; 44 import com.google.common.collect.Lists;
45 +import com.google.common.collect.Sets;
42 46
43 import java.util.ArrayList; 47 import java.util.ArrayList;
44 import java.util.Collection; 48 import java.util.Collection;
...@@ -71,6 +75,9 @@ public class ClusterManager ...@@ -71,6 +75,9 @@ public class ClusterManager
71 protected ClusterMetadataService clusterMetadataService; 75 protected ClusterMetadataService clusterMetadataService;
72 76
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 77 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
78 + protected ClusterMetadataAdminService clusterMetadataAdminService;
79 +
80 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected ClusterStore store; 81 protected ClusterStore store;
75 82
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -136,7 +143,7 @@ public class ClusterManager ...@@ -136,7 +143,7 @@ public class ClusterManager
136 .withControllerNodes(nodes) 143 .withControllerNodes(nodes)
137 .withPartitions(buildDefaultPartitions(nodes)) 144 .withPartitions(buildDefaultPartitions(nodes))
138 .build(); 145 .build();
139 - clusterMetadataService.setClusterMetadata(metadata); 146 + clusterMetadataAdminService.setClusterMetadata(metadata);
140 try { 147 try {
141 log.warn("Shutting down container for cluster reconfiguration!"); 148 log.warn("Shutting down container for cluster reconfiguration!");
142 systemService.reboot("now", SystemService.Swipe.NONE); 149 systemService.reboot("now", SystemService.Swipe.NONE);
...@@ -171,15 +178,36 @@ public class ClusterManager ...@@ -171,15 +178,36 @@ public class ClusterManager
171 List<ControllerNode> sorted = new ArrayList<>(nodes); 178 List<ControllerNode> sorted = new ArrayList<>(nodes);
172 Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString())); 179 Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
173 Collection<Partition> partitions = Lists.newArrayList(); 180 Collection<Partition> partitions = Lists.newArrayList();
174 - 181 + // add p0 partition
182 + partitions.add(new Partition() {
183 + @Override
184 + public PartitionId getId() {
185 + return PartitionId.from((0));
186 + }
187 + @Override
188 + public Collection<NodeId> getMembers() {
189 + return Sets.newHashSet(Collections2.transform(nodes, ControllerNode::id));
190 + }
191 + });
192 + // add extended partitions
175 int length = nodes.size(); 193 int length = nodes.size();
176 int count = 3; 194 int count = 3;
177 for (int i = 0; i < length; i++) { 195 for (int i = 0; i < length; i++) {
196 + int index = i;
178 Set<NodeId> set = new HashSet<>(count); 197 Set<NodeId> set = new HashSet<>(count);
179 for (int j = 0; j < count; j++) { 198 for (int j = 0; j < count; j++) {
180 set.add(sorted.get((i + j) % length).id()); 199 set.add(sorted.get((i + j) % length).id());
181 } 200 }
182 - partitions.add(new Partition("p" + (i + 1), set)); 201 + partitions.add(new Partition() {
202 + @Override
203 + public PartitionId getId() {
204 + return PartitionId.from((index + 1));
205 + }
206 + @Override
207 + public Collection<NodeId> getMembers() {
208 + return set;
209 + }
210 + });
183 } 211 }
184 return partitions; 212 return partitions;
185 } 213 }
......
1 +/*
2 + * Copyright 2015-2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
1 package org.onosproject.cluster.impl; 16 package org.onosproject.cluster.impl;
2 17
3 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -17,6 +32,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; ...@@ -17,6 +32,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
17 import org.apache.felix.scr.annotations.Service; 32 import org.apache.felix.scr.annotations.Service;
18 import org.onlab.packet.IpAddress; 33 import org.onlab.packet.IpAddress;
19 import org.onosproject.cluster.ClusterMetadata; 34 import org.onosproject.cluster.ClusterMetadata;
35 +import org.onosproject.cluster.ClusterMetadataAdminService;
20 import org.onosproject.cluster.ClusterMetadataEvent; 36 import org.onosproject.cluster.ClusterMetadataEvent;
21 import org.onosproject.cluster.ClusterMetadataEventListener; 37 import org.onosproject.cluster.ClusterMetadataEventListener;
22 import org.onosproject.cluster.ClusterMetadataService; 38 import org.onosproject.cluster.ClusterMetadataService;
...@@ -34,10 +50,10 @@ import org.slf4j.Logger; ...@@ -34,10 +50,10 @@ import org.slf4j.Logger;
34 @Service 50 @Service
35 public class ClusterMetadataManager 51 public class ClusterMetadataManager
36 extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener> 52 extends AbstractListenerManager<ClusterMetadataEvent, ClusterMetadataEventListener>
37 - implements ClusterMetadataService { 53 + implements ClusterMetadataService, ClusterMetadataAdminService {
38 54
39 - private ControllerNode localNode;
40 private final Logger log = getLogger(getClass()); 55 private final Logger log = getLogger(getClass());
56 + private ControllerNode localNode;
41 57
42 private ClusterMetadataStoreDelegate delegate = new InternalStoreDelegate(); 58 private ClusterMetadataStoreDelegate delegate = new InternalStoreDelegate();
43 59
......
1 +/*
2 + * Copyright 2015-2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.cluster.impl;
17 +
18 +import static com.google.common.base.Preconditions.checkNotNull;
19 +
20 +import java.util.Collection;
21 +import java.util.Objects;
22 +
23 +import org.onosproject.cluster.NodeId;
24 +import org.onosproject.cluster.Partition;
25 +import org.onosproject.cluster.PartitionId;
26 +
27 +import com.google.common.base.MoreObjects;
28 +import com.google.common.collect.ImmutableSet;
29 +import com.google.common.collect.Sets;
30 +
31 +/**
32 + * Default {@link Partition} implementation.
33 + */
34 +public class DefaultPartition implements Partition {
35 +
36 + private final PartitionId id;
37 + private final Collection<NodeId> members;
38 +
39 + private DefaultPartition() {
40 + id = null;
41 + members = null;
42 + }
43 +
44 + public DefaultPartition(PartitionId id, Collection<NodeId> members) {
45 + this.id = checkNotNull(id);
46 + this.members = ImmutableSet.copyOf(members);
47 + }
48 +
49 + @Override
50 + public PartitionId getId() {
51 + return this.id;
52 + }
53 +
54 + @Override
55 + public Collection<NodeId> getMembers() {
56 + return this.members;
57 + }
58 +
59 + @Override
60 + public String toString() {
61 + return MoreObjects.toStringHelper(getClass())
62 + .add("id", id)
63 + .add("members", members)
64 + .toString();
65 + }
66 +
67 + @Override
68 + public int hashCode() {
69 + return Objects.hash(id, members);
70 + }
71 +
72 + @Override
73 + public boolean equals(Object other) {
74 + if (!(other instanceof DefaultPartition)) {
75 + return false;
76 + }
77 + DefaultPartition that = (DefaultPartition) other;
78 + return this.getId().equals(that.getId()) &&
79 + Sets.symmetricDifference(Sets.newHashSet(this.members), Sets.newHashSet(that.members)).isEmpty();
80 + }
81 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015-2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
1 package org.onosproject.store.cluster.impl; 16 package org.onosproject.store.cluster.impl;
2 17
3 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -28,6 +43,7 @@ import org.onosproject.cluster.ControllerNode; ...@@ -28,6 +43,7 @@ import org.onosproject.cluster.ControllerNode;
28 import org.onosproject.cluster.DefaultControllerNode; 43 import org.onosproject.cluster.DefaultControllerNode;
29 import org.onosproject.cluster.NodeId; 44 import org.onosproject.cluster.NodeId;
30 import org.onosproject.cluster.Partition; 45 import org.onosproject.cluster.Partition;
46 +import org.onosproject.cluster.PartitionId;
31 import org.onosproject.store.AbstractStore; 47 import org.onosproject.store.AbstractStore;
32 import org.onosproject.store.service.Versioned; 48 import org.onosproject.store.service.Versioned;
33 import org.slf4j.Logger; 49 import org.slf4j.Logger;
...@@ -44,6 +60,7 @@ import com.fasterxml.jackson.databind.SerializerProvider; ...@@ -44,6 +60,7 @@ import com.fasterxml.jackson.databind.SerializerProvider;
44 import com.fasterxml.jackson.databind.module.SimpleModule; 60 import com.fasterxml.jackson.databind.module.SimpleModule;
45 import com.google.common.base.Throwables; 61 import com.google.common.base.Throwables;
46 import com.google.common.collect.Lists; 62 import com.google.common.collect.Lists;
63 +import com.google.common.collect.Sets;
47 import com.google.common.io.Files; 64 import com.google.common.io.Files;
48 65
49 /** 66 /**
...@@ -76,6 +93,7 @@ public class StaticClusterMetadataStore ...@@ -76,6 +93,7 @@ public class StaticClusterMetadataStore
76 module.addDeserializer(NodeId.class, new NodeIdDeserializer()); 93 module.addDeserializer(NodeId.class, new NodeIdDeserializer());
77 module.addSerializer(ControllerNode.class, new ControllerNodeSerializer()); 94 module.addSerializer(ControllerNode.class, new ControllerNodeSerializer());
78 module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer()); 95 module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer());
96 + module.addDeserializer(Partition.class, new PartitionDeserializer());
79 mapper.registerModule(module); 97 mapper.registerModule(module);
80 File metadataFile = new File(CLUSTER_METADATA_FILE); 98 File metadataFile = new File(CLUSTER_METADATA_FILE);
81 if (metadataFile.exists()) { 99 if (metadataFile.exists()) {
...@@ -89,10 +107,21 @@ public class StaticClusterMetadataStore ...@@ -89,10 +107,21 @@ public class StaticClusterMetadataStore
89 String localIp = getSiteLocalAddress(); 107 String localIp = getSiteLocalAddress();
90 ControllerNode localNode = 108 ControllerNode localNode =
91 new DefaultControllerNode(new NodeId(localIp), IpAddress.valueOf(localIp), DEFAULT_ONOS_PORT); 109 new DefaultControllerNode(new NodeId(localIp), IpAddress.valueOf(localIp), DEFAULT_ONOS_PORT);
110 + Partition defaultPartition = new Partition() {
111 + @Override
112 + public PartitionId getId() {
113 + return PartitionId.from(1);
114 + }
115 +
116 + @Override
117 + public Collection<NodeId> getMembers() {
118 + return Sets.newHashSet(localNode.id());
119 + }
120 + };
92 metadata.set(ClusterMetadata.builder() 121 metadata.set(ClusterMetadata.builder()
93 .withName("default") 122 .withName("default")
94 .withControllerNodes(Arrays.asList(localNode)) 123 .withControllerNodes(Arrays.asList(localNode))
95 - .withPartitions(Lists.newArrayList(new Partition("p1", Lists.newArrayList(localNode.id())))) 124 + .withPartitions(Lists.newArrayList(defaultPartition))
96 .build()); 125 .build());
97 version = System.currentTimeMillis(); 126 version = System.currentTimeMillis();
98 } 127 }
...@@ -138,25 +167,33 @@ public class StaticClusterMetadataStore ...@@ -138,25 +167,33 @@ public class StaticClusterMetadataStore
138 } 167 }
139 168
140 @Override 169 @Override
141 - public void setActiveReplica(String partitionId, NodeId nodeId) { 170 + public void addActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
142 throw new UnsupportedOperationException(); 171 throw new UnsupportedOperationException();
143 } 172 }
144 173
145 @Override 174 @Override
146 - public void unsetActiveReplica(String partitionId, NodeId nodeId) { 175 + public void removeActivePartitionMember(PartitionId partitionId, NodeId nodeId) {
147 throw new UnsupportedOperationException(); 176 throw new UnsupportedOperationException();
148 } 177 }
149 178
150 @Override 179 @Override
151 - public Collection<NodeId> getActiveReplicas(String partitionId) { 180 + public Collection<NodeId> getActivePartitionMembers(PartitionId partitionId) {
152 return metadata.get().getPartitions() 181 return metadata.get().getPartitions()
153 .stream() 182 .stream()
154 - .filter(r -> r.getName().equals(partitionId)) 183 + .filter(r -> r.getId().equals(partitionId))
155 .findFirst() 184 .findFirst()
156 .map(r -> r.getMembers()) 185 .map(r -> r.getMembers())
157 .orElse(null); 186 .orElse(null);
158 } 187 }
159 188
189 + private static class PartitionDeserializer extends JsonDeserializer<Partition> {
190 + @Override
191 + public Partition deserialize(JsonParser jp, DeserializationContext ctxt)
192 + throws IOException, JsonProcessingException {
193 + return jp.readValueAs(DefaultPartition.class);
194 + }
195 + }
196 +
160 private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> { 197 private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> {
161 @Override 198 @Override
162 public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider) 199 public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider)
......
...@@ -53,6 +53,7 @@ import org.onosproject.cluster.ClusterMetadataService; ...@@ -53,6 +53,7 @@ import org.onosproject.cluster.ClusterMetadataService;
53 import org.onosproject.cluster.ClusterService; 53 import org.onosproject.cluster.ClusterService;
54 import org.onosproject.cluster.ControllerNode; 54 import org.onosproject.cluster.ControllerNode;
55 import org.onosproject.cluster.NodeId; 55 import org.onosproject.cluster.NodeId;
56 +import org.onosproject.cluster.PartitionId;
56 import org.onosproject.core.ApplicationId; 57 import org.onosproject.core.ApplicationId;
57 import org.onosproject.core.IdGenerator; 58 import org.onosproject.core.IdGenerator;
58 import org.onosproject.persistence.PersistenceService; 59 import org.onosproject.persistence.PersistenceService;
...@@ -82,6 +83,7 @@ import java.util.concurrent.ExecutionException; ...@@ -82,6 +83,7 @@ import java.util.concurrent.ExecutionException;
82 import java.util.concurrent.Executors; 83 import java.util.concurrent.Executors;
83 import java.util.concurrent.TimeUnit; 84 import java.util.concurrent.TimeUnit;
84 import java.util.concurrent.TimeoutException; 85 import java.util.concurrent.TimeoutException;
86 +import java.util.function.Function;
85 import java.util.stream.Collectors; 87 import java.util.stream.Collectors;
86 88
87 import static org.slf4j.LoggerFactory.getLogger; 89 import static org.slf4j.LoggerFactory.getLogger;
...@@ -151,12 +153,11 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -151,12 +153,11 @@ public class DatabaseManager implements StorageService, StorageAdminService {
151 public void activate() { 153 public void activate() {
152 localNodeId = clusterService.getLocalNode().id(); 154 localNodeId = clusterService.getLocalNode().id();
153 155
154 - Map<String, Set<NodeId>> partitionMap = Maps.newHashMap(); 156 + Map<PartitionId, Set<NodeId>> partitionMap = Maps.newHashMap();
155 clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> { 157 clusterMetadataService.getClusterMetadata().getPartitions().forEach(p -> {
156 - partitionMap.put(p.getName(), Sets.newHashSet(p.getMembers())); 158 + partitionMap.put(p.getId(), Sets.newHashSet(p.getMembers()));
157 }); 159 });
158 160
159 -
160 String[] activeNodeUris = partitionMap.values() 161 String[] activeNodeUris = partitionMap.values()
161 .stream() 162 .stream()
162 .reduce((s1, s2) -> Sets.union(s1, s2)) 163 .reduce((s1, s2) -> Sets.union(s1, s2))
...@@ -183,28 +184,19 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -183,28 +184,19 @@ public class DatabaseManager implements StorageService, StorageAdminService {
183 184
184 coordinator = new DefaultClusterCoordinator(copycatConfig.resolve()); 185 coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
185 186
186 - DatabaseConfig inMemoryDatabaseConfig = 187 + Function<PartitionId, Log> logFunction = id -> id.asInt() == 0 ? newInMemoryLog() : newPersistentLog();
187 - newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris);
188 - inMemoryDatabase = coordinator
189 - .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig)
190 - .withSerializer(copycatConfig.getDefaultSerializer())
191 - .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
192 188
193 - List<Database> partitions = partitionMap.entrySet() 189 + Map<PartitionId, Database> databases = Maps.transformEntries(partitionMap, (k, v) -> {
194 - .stream() 190 + String[] replicas = v.stream().map(this::nodeIdToUri).toArray(String[]::new);
195 - .map(entry -> { 191 + DatabaseConfig config = newDatabaseConfig(String.format("p%s", k), logFunction.apply(k), replicas);
196 - String[] replicas = entry.getValue().stream().map(this::nodeIdToUri).toArray(String[]::new); 192 + return coordinator.getResource(config.getName(), config.resolve(clusterConfig)
197 - return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
198 - })
199 - .map(config -> {
200 - Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig)
201 .withSerializer(copycatConfig.getDefaultSerializer()) 193 .withSerializer(copycatConfig.getDefaultSerializer())
202 .withDefaultExecutor(copycatConfig.getDefaultExecutor())); 194 .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
203 - return db; 195 + });
204 - }) 196 +
205 - .collect(Collectors.toList()); 197 + inMemoryDatabase = databases.remove(PartitionId.from(0));
206 198
207 - partitionedDatabase = new PartitionedDatabase("onos-store", partitions); 199 + partitionedDatabase = new PartitionedDatabase("onos-store", databases.values());
208 200
209 CompletableFuture<Void> status = coordinator.open() 201 CompletableFuture<Void> status = coordinator.open()
210 .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open()) 202 .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
......
...@@ -32,11 +32,13 @@ import org.onlab.packet.IpAddress; ...@@ -32,11 +32,13 @@ import org.onlab.packet.IpAddress;
32 import org.onlab.packet.ONOSLLDP; 32 import org.onlab.packet.ONOSLLDP;
33 import org.onosproject.cfg.ComponentConfigAdapter; 33 import org.onosproject.cfg.ComponentConfigAdapter;
34 import org.onosproject.cluster.ClusterMetadata; 34 import org.onosproject.cluster.ClusterMetadata;
35 +import org.onosproject.cluster.ClusterMetadataEventListener;
35 import org.onosproject.cluster.ClusterMetadataService; 36 import org.onosproject.cluster.ClusterMetadataService;
36 import org.onosproject.cluster.ControllerNode; 37 import org.onosproject.cluster.ControllerNode;
37 import org.onosproject.cluster.DefaultControllerNode; 38 import org.onosproject.cluster.DefaultControllerNode;
38 import org.onosproject.cluster.NodeId; 39 import org.onosproject.cluster.NodeId;
39 import org.onosproject.cluster.Partition; 40 import org.onosproject.cluster.Partition;
41 +import org.onosproject.cluster.PartitionId;
40 import org.onosproject.cluster.RoleInfo; 42 import org.onosproject.cluster.RoleInfo;
41 import org.onosproject.core.ApplicationId; 43 import org.onosproject.core.ApplicationId;
42 import org.onosproject.core.CoreService; 44 import org.onosproject.core.CoreService;
...@@ -77,6 +79,7 @@ import org.onosproject.net.provider.AbstractProviderService; ...@@ -77,6 +79,7 @@ import org.onosproject.net.provider.AbstractProviderService;
77 import org.onosproject.net.provider.ProviderId; 79 import org.onosproject.net.provider.ProviderId;
78 80
79 import java.nio.ByteBuffer; 81 import java.nio.ByteBuffer;
82 +import java.util.Collection;
80 import java.util.HashMap; 83 import java.util.HashMap;
81 import java.util.HashSet; 84 import java.util.HashSet;
82 import java.util.List; 85 import java.util.List;
...@@ -950,7 +953,16 @@ public class LldpLinkProviderTest { ...@@ -950,7 +953,16 @@ public class LldpLinkProviderTest {
950 public ClusterMetadata getClusterMetadata() { 953 public ClusterMetadata getClusterMetadata() {
951 final NodeId nid = new NodeId("test-node"); 954 final NodeId nid = new NodeId("test-node");
952 final IpAddress addr = IpAddress.valueOf(0); 955 final IpAddress addr = IpAddress.valueOf(0);
953 - final Partition p = new Partition("test-pt", Sets.newHashSet(nid)); 956 + final Partition p = new Partition() {
957 + public PartitionId getId() {
958 + return PartitionId.from(1);
959 + }
960 +
961 + @Override
962 + public Collection<NodeId> getMembers() {
963 + return Sets.newHashSet(nid);
964 + }
965 + };
954 return ClusterMetadata.builder() 966 return ClusterMetadata.builder()
955 .withName("test-cluster") 967 .withName("test-cluster")
956 .withControllerNodes(Sets.newHashSet(new DefaultControllerNode(nid, addr))) 968 .withControllerNodes(Sets.newHashSet(new DefaultControllerNode(nid, addr)))
...@@ -958,12 +970,16 @@ public class LldpLinkProviderTest { ...@@ -958,12 +970,16 @@ public class LldpLinkProviderTest {
958 } 970 }
959 971
960 @Override 972 @Override
961 - public void setClusterMetadata(ClusterMetadata metadata) { 973 + public ControllerNode getLocalNode() {
974 + return null;
975 + }
976 +
977 + @Override
978 + public void addListener(ClusterMetadataEventListener listener) {
962 } 979 }
963 980
964 @Override 981 @Override
965 - public ControllerNode getLocalNode() { 982 + public void removeListener(ClusterMetadataEventListener listener) {
966 - return null;
967 } 983 }
968 } 984 }
969 } 985 }
......
...@@ -150,7 +150,7 @@ cat > $STAGE/config/cluster.json <<EOF ...@@ -150,7 +150,7 @@ cat > $STAGE/config/cluster.json <<EOF
150 { 150 {
151 "name": "default", 151 "name": "default",
152 "nodes": [ {"id": "$IP", "ip": "$IP", "port": 9876 } ], 152 "nodes": [ {"id": "$IP", "ip": "$IP", "port": 9876 } ],
153 - "partitions": [ { "name": "p1", "members": [ "$IP" ] } ] 153 + "partitions": [ { "id": 0, "members": [ "$IP" ] }, { "id": 1, "members": [ "$IP" ] } ]
154 } 154 }
155 EOF 155 EOF
156 156
......
...@@ -27,12 +27,18 @@ def get_nodes(vars, port=9876): ...@@ -27,12 +27,18 @@ def get_nodes(vars, port=9876):
27 node = lambda k: { 'id': k, 'ip': k, 'port': port } 27 node = lambda k: { 'id': k, 'ip': k, 'port': port }
28 return [ node(environ[v]) for v in vars ] 28 return [ node(environ[v]) for v in vars ]
29 29
30 -def generate_permutations(nodes, k): 30 +def generate_base_partition(nodes):
31 + return {
32 + 'id': 0,
33 + 'members': nodes
34 + }
35 +
36 +def generate_extended_partitions(nodes, k):
31 l = deque(nodes) 37 l = deque(nodes)
32 perms = [] 38 perms = []
33 for i in range(1, len(nodes)+1): 39 for i in range(1, len(nodes)+1):
34 part = { 40 part = {
35 - 'name': 'p%d' % i, 41 + 'id': i,
36 'members': list(l)[:k] 42 'members': list(l)[:k]
37 } 43 }
38 perms.append(part) 44 perms.append(part)
...@@ -42,7 +48,11 @@ def generate_permutations(nodes, k): ...@@ -42,7 +48,11 @@ def generate_permutations(nodes, k):
42 if __name__ == '__main__': 48 if __name__ == '__main__':
43 vars = get_OC_vars() 49 vars = get_OC_vars()
44 nodes = get_nodes(vars) 50 nodes = get_nodes(vars)
45 - partitions = generate_permutations([v.get('id') for v in nodes], 3) 51 + base_partition = generate_base_partition([v.get('id') for v in nodes])
52 + extended_partitions = generate_extended_partitions([v.get('id') for v in nodes], 3)
53 + partitions = []
54 + partitions.append(base_partition)
55 + partitions.extend(extended_partitions)
46 name = 0 56 name = 0
47 for node in nodes: 57 for node in nodes:
48 name = name ^ hash(node['ip']) 58 name = name ^ hash(node['ip'])
......