Madan Jampani
Committed by Gerrit Code Review

ClusterManager support for reacting to cluster metadata changes

Change-Id: I7befaf4f955bda093d89c3c431eae6814409ae03
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.cluster;
17 +
18 +import static com.google.common.base.Preconditions.checkState;
19 +
20 +import java.util.Map;
21 +import java.util.Set;
22 +import java.util.stream.Collectors;
23 +
24 +import com.google.common.collect.ImmutableSet;
25 +import com.google.common.collect.Maps;
26 +import com.google.common.collect.Sets;
27 +
28 +/**
29 + * Utility for examining differences between two {@link ClusterMetadata metadata} values.
30 + */
31 +public class ClusterMetadataDiff {
32 +
33 + private final ClusterMetadata oldValue;
34 + private final ClusterMetadata newValue;
35 + private final Set<ControllerNode> nodesAdded;
36 + private final Set<NodeId> nodesRemoved;
37 +
38 + public ClusterMetadataDiff(ClusterMetadata oldValue, ClusterMetadata newValue) {
39 + this.oldValue = oldValue;
40 + this.newValue = newValue;
41 +
42 + Set<ControllerNode> currentNodeSet = oldValue == null
43 + ? ImmutableSet.of() : ImmutableSet.copyOf(oldValue.getNodes());
44 + Set<ControllerNode> newNodeSet = newValue == null
45 + ? ImmutableSet.of() : ImmutableSet.copyOf(newValue.getNodes());
46 + nodesAdded = Sets.difference(newNodeSet, currentNodeSet);
47 + nodesRemoved = Sets.difference(currentNodeSet, newNodeSet)
48 + .stream()
49 + .map(ControllerNode::id)
50 + .collect(Collectors.toSet());
51 + }
52 +
53 + /**
54 + * Returns the set of {@link ControllerNode nodes} added with this metadata change.
55 + * @return set of controller nodes
56 + */
57 + public Set<ControllerNode> nodesAdded() {
58 + return nodesAdded;
59 + }
60 +
61 + /**
62 + * Returns the set of {@link ControllerNode nodes} removed with this metadata change.
63 + * @return set of controller node identifiers
64 + */
65 + public Set<NodeId> nodesRemoved() {
66 + return nodesRemoved;
67 + }
68 +
69 + /**
70 + * Returns a mapping of all partition diffs.
71 + * @return partition diffs.
72 + */
73 + public Map<PartitionId, PartitionDiff> partitionDiffs() {
74 + Map<PartitionId, Partition> oldPartitions = Maps.newHashMap();
75 + oldValue.getPartitions()
76 + .forEach(p -> oldPartitions.put(p.getId(), p));
77 + Map<PartitionId, Partition> newPartitions = Maps.newHashMap();
78 + newValue.getPartitions()
79 + .forEach(p -> newPartitions.put(p.getId(), p));
80 + checkState(Sets.symmetricDifference(oldPartitions.keySet(), newPartitions.keySet()).isEmpty(),
81 + "Number of partitions cannot change");
82 + Map<PartitionId, PartitionDiff> partitionDiffs = Maps.newHashMap();
83 + oldPartitions.forEach((k, v) -> {
84 + partitionDiffs.put(k, new PartitionDiff(v, newPartitions.get(k)));
85 + });
86 + return partitionDiffs;
87 + }
88 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.cluster;
17 +
18 +import java.util.Objects;
19 +import java.util.Set;
20 +
21 +import com.google.common.base.MoreObjects;
22 +import com.google.common.collect.ImmutableSet;
23 +import com.google.common.collect.Sets;
24 +
25 +/**
26 + * Utility for examining differences between two {@link Partition partition} values.
27 + */
28 +public class PartitionDiff {
29 +
30 + private final Partition oldValue;
31 + private final Partition newValue;
32 + private final PartitionId partitionId;
33 + private final Set<NodeId> currentMembers;
34 + private final Set<NodeId> newMembers;
35 +
36 + public PartitionDiff(Partition oldValue, Partition newValue) {
37 + this.oldValue = oldValue;
38 + this.newValue = newValue;
39 + this.partitionId = oldValue.getId();
40 + this.currentMembers = oldValue == null ? ImmutableSet.of() : ImmutableSet.copyOf(oldValue.getMembers());
41 + this.newMembers = newValue == null ? ImmutableSet.of() : ImmutableSet.copyOf(newValue.getMembers());
42 + }
43 +
44 + /**
45 + * Returns the new partition identifier.
46 + * @return partition id
47 + */
48 + public PartitionId partitionId() {
49 + return partitionId;
50 + }
51 +
52 + /**
53 + * Returns the old partition value.
54 + * @return partition
55 + */
56 + public Partition oldValue() {
57 + return oldValue;
58 + }
59 +
60 + /**
61 + * Returns the new partition value.
62 + * @return partition
63 + */
64 + public Partition newValue() {
65 + return newValue;
66 + }
67 +
68 + /**
69 + * Returns if there are differences between the two values.
70 + * @return {@code true} if yes; {@code false} otherwise
71 + */
72 + public boolean hasChanged() {
73 + return !Sets.symmetricDifference(currentMembers, newMembers).isEmpty();
74 + }
75 +
76 + /**
77 + * Returns if the specified node is introduced in the new value.
78 + * @param nodeId node identifier
79 + * @return {@code true} if yes; {@code false} otherwise
80 + */
81 + public boolean isAdded(NodeId nodeId) {
82 + return !currentMembers.contains(nodeId) && newMembers.contains(nodeId);
83 + }
84 +
85 + /**
86 + * Returns if the specified node is removed in the new value.
87 + * @param nodeId node identifier
88 + * @return {@code true} if yes; {@code false} otherwise
89 + */
90 + public boolean isRemoved(NodeId nodeId) {
91 + return currentMembers.contains(nodeId) && !newMembers.contains(nodeId);
92 + }
93 +
94 + @Override
95 + public int hashCode() {
96 + return Objects.hash(oldValue, newValue);
97 + }
98 +
99 + @Override
100 + public boolean equals(Object other) {
101 + if (other == null || !(other instanceof PartitionDiff)) {
102 + return false;
103 + }
104 + PartitionDiff that = (PartitionDiff) other;
105 + return Objects.equals(this.oldValue, that.oldValue) &&
106 + Objects.equals(this.newValue, that.newValue);
107 +
108 + }
109 +
110 + @Override
111 + public String toString() {
112 + return MoreObjects.toStringHelper(getClass())
113 + .add("oldValue", oldValue)
114 + .add("newValue", newValue)
115 + .toString();
116 + }
117 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.cluster;
17 +
18 +import static org.junit.Assert.assertEquals;
19 +import static org.junit.Assert.assertFalse;
20 +import static org.junit.Assert.assertTrue;
21 +
22 +import org.junit.Test;
23 +import org.onlab.packet.IpAddress;
24 +
25 +import com.google.common.collect.ImmutableSet;
26 +import com.google.common.collect.Sets;
27 +
28 +/**
29 + * Unit tests for ClusterMetadataDiff.
30 + */
31 +public class ClusterMetadataDiffTest {
32 +
33 + @Test
34 + public void testDiffNoChange() {
35 + PartitionId pid1 = PartitionId.from(1);
36 + NodeId nid1 = NodeId.nodeId("10.0.0.1");
37 + ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
38 + Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
39 + ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
40 + ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md1);
41 + assertTrue(diff.nodesAdded().isEmpty());
42 + assertTrue(diff.nodesRemoved().isEmpty());
43 + assertEquals(diff.partitionDiffs().size(), 1);
44 + assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
45 + PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
46 + assertFalse(pdiff.hasChanged());
47 + }
48 +
49 + @Test
50 + public void testDiffForScaleUp() {
51 + PartitionId pid1 = PartitionId.from(1);
52 + NodeId nid1 = NodeId.nodeId("10.0.0.1");
53 + NodeId nid2 = NodeId.nodeId("10.0.0.2");
54 + ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
55 + ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
56 + Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
57 + Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
58 + ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
59 + ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
60 + ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md12);
61 + assertEquals(diff.nodesAdded(), Sets.newHashSet(n2));
62 + assertTrue(diff.nodesRemoved().isEmpty());
63 + assertEquals(diff.partitionDiffs().size(), 1);
64 + assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
65 + PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
66 + assertTrue(pdiff.hasChanged());
67 + assertFalse(pdiff.isAdded(nid1));
68 + assertTrue(pdiff.isAdded(nid2));
69 + assertFalse(pdiff.isRemoved(nid1));
70 + assertFalse(pdiff.isAdded(nid1));
71 + }
72 +
73 + @Test
74 + public void testDiffForScaleDown() {
75 + PartitionId pid1 = PartitionId.from(1);
76 + NodeId nid1 = NodeId.nodeId("10.0.0.1");
77 + NodeId nid2 = NodeId.nodeId("10.0.0.2");
78 + ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
79 + ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
80 + Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
81 + Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
82 + ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
83 + ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
84 + ClusterMetadataDiff diff = new ClusterMetadataDiff(md12, md1);
85 + assertEquals(diff.nodesRemoved(), Sets.newHashSet(nid2));
86 + assertTrue(diff.nodesAdded().isEmpty());
87 + assertEquals(diff.partitionDiffs().size(), 1);
88 + assertEquals(diff.partitionDiffs().keySet(), Sets.newHashSet(pid1));
89 + PartitionDiff pdiff = diff.partitionDiffs().get(pid1);
90 + assertTrue(pdiff.hasChanged());
91 + assertTrue(pdiff.isRemoved(nid2));
92 + assertFalse(pdiff.isAdded(nid2));
93 + assertFalse(pdiff.isRemoved(nid1));
94 + assertFalse(pdiff.isAdded(nid1));
95 + }
96 +}
...@@ -29,6 +29,9 @@ import org.onosproject.cluster.ClusterEvent; ...@@ -29,6 +29,9 @@ import org.onosproject.cluster.ClusterEvent;
29 import org.onosproject.cluster.ClusterEventListener; 29 import org.onosproject.cluster.ClusterEventListener;
30 import org.onosproject.cluster.ClusterMetadata; 30 import org.onosproject.cluster.ClusterMetadata;
31 import org.onosproject.cluster.ClusterMetadataAdminService; 31 import org.onosproject.cluster.ClusterMetadataAdminService;
32 +import org.onosproject.cluster.ClusterMetadataDiff;
33 +import org.onosproject.cluster.ClusterMetadataEvent;
34 +import org.onosproject.cluster.ClusterMetadataEventListener;
32 import org.onosproject.cluster.ClusterMetadataService; 35 import org.onosproject.cluster.ClusterMetadataService;
33 import org.onosproject.cluster.ClusterService; 36 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.ClusterStore; 37 import org.onosproject.cluster.ClusterStore;
...@@ -50,6 +53,7 @@ import java.util.Collections; ...@@ -50,6 +53,7 @@ import java.util.Collections;
50 import java.util.HashSet; 53 import java.util.HashSet;
51 import java.util.List; 54 import java.util.List;
52 import java.util.Set; 55 import java.util.Set;
56 +import java.util.concurrent.atomic.AtomicReference;
53 57
54 import static com.google.common.base.Preconditions.checkArgument; 58 import static com.google.common.base.Preconditions.checkArgument;
55 import static com.google.common.base.Preconditions.checkNotNull; 59 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -83,18 +87,21 @@ public class ClusterManager ...@@ -83,18 +87,21 @@ public class ClusterManager
83 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84 protected SystemService systemService; 88 protected SystemService systemService;
85 89
90 + private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
91 + private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
92 +
86 @Activate 93 @Activate
87 public void activate() { 94 public void activate() {
88 store.setDelegate(delegate); 95 store.setDelegate(delegate);
89 eventDispatcher.addSink(ClusterEvent.class, listenerRegistry); 96 eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
90 - clusterMetadataService.getClusterMetadata() 97 + clusterMetadataService.addListener(metadataListener);
91 - .getNodes() 98 + processMetadata(clusterMetadataService.getClusterMetadata());
92 - .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
93 log.info("Started"); 99 log.info("Started");
94 } 100 }
95 101
96 @Deactivate 102 @Deactivate
97 public void deactivate() { 103 public void deactivate() {
104 + clusterMetadataService.removeListener(metadataListener);
98 store.unsetDelegate(delegate); 105 store.unsetDelegate(delegate);
99 eventDispatcher.removeSink(ClusterEvent.class); 106 eventDispatcher.removeSink(ClusterEvent.class);
100 log.info("Stopped"); 107 log.info("Stopped");
...@@ -190,4 +197,25 @@ public class ClusterManager ...@@ -190,4 +197,25 @@ public class ClusterManager
190 } 197 }
191 return partitions; 198 return partitions;
192 } 199 }
200 +
201 + /**
202 + * Processes metadata by adding and removing nodes from the cluster.
203 + */
204 + private synchronized void processMetadata(ClusterMetadata metadata) {
205 + try {
206 + ClusterMetadataDiff examiner =
207 + new ClusterMetadataDiff(currentMetadata.get(), metadata);
208 + examiner.nodesAdded().forEach(node -> addNode(node.id(), node.ip(), node.tcpPort()));
209 + examiner.nodesRemoved().forEach(this::removeNode);
210 + } finally {
211 + currentMetadata.set(metadata);
212 + }
213 + }
214 +
215 + private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
216 + @Override
217 + public void event(ClusterMetadataEvent event) {
218 + processMetadata(event.subject());
219 + }
220 + }
193 } 221 }
......