Jonathan Hart

Automatically rebalance intent key partitions on cluster change.

Also sorted output of leaders command by leader IP.

Change-Id: Ie85896a4f6f50489ebd7994c905808ce34fca94c
...@@ -20,6 +20,7 @@ import org.onosproject.cli.AbstractShellCommand; ...@@ -20,6 +20,7 @@ import org.onosproject.cli.AbstractShellCommand;
20 import org.onosproject.cluster.Leadership; 20 import org.onosproject.cluster.Leadership;
21 import org.onosproject.cluster.LeadershipService; 21 import org.onosproject.cluster.LeadershipService;
22 22
23 +import java.util.Comparator;
23 import java.util.Map; 24 import java.util.Map;
24 25
25 /** 26 /**
...@@ -29,17 +30,32 @@ import java.util.Map; ...@@ -29,17 +30,32 @@ import java.util.Map;
29 description = "Finds the leader for particular topic.") 30 description = "Finds the leader for particular topic.")
30 public class LeaderCommand extends AbstractShellCommand { 31 public class LeaderCommand extends AbstractShellCommand {
31 32
32 - private static final String FMT = "%-20s: %15s %5s"; 33 + private static final String FMT = "%-20s: %15s %15s";
33 34
34 @Override 35 @Override
35 protected void execute() { 36 protected void execute() {
36 LeadershipService leaderService = get(LeadershipService.class); 37 LeadershipService leaderService = get(LeadershipService.class);
37 Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard(); 38 Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
38 print(FMT, "Topic", "Leader", "Epoch"); 39 print(FMT, "Topic", "Leader", "Epoch");
39 - for (String topic : leaderBoard.keySet()) { 40 +
40 - Leadership leadership = leaderBoard.get(topic); 41 + Comparator<Leadership> leadershipComparator =
41 - print(FMT, topic, leadership.leader(), leadership.epoch()); 42 + (e1, e2) -> {
42 - } 43 + if (e1.leader() == null && e2.leader() == null) {
44 + return 0;
45 + }
46 + if (e1.leader() == null) {
47 + return 1;
48 + }
49 + if (e2.leader() == null) {
50 + return -1;
51 + }
52 + return e1.leader().toString().compareTo(e2.leader().toString());
53 + };
54 +
55 + leaderBoard.values()
56 + .stream()
57 + .sorted(leadershipComparator)
58 + .forEach(l -> print(FMT, l.topic(), l.leader(), l.epoch()));
43 } 59 }
44 60
45 } 61 }
......
...@@ -24,17 +24,26 @@ import java.util.Objects; ...@@ -24,17 +24,26 @@ import java.util.Objects;
24 * processed by a single ONOS instance at a time. 24 * processed by a single ONOS instance at a time.
25 */ 25 */
26 public class PartitionId { 26 public class PartitionId {
27 - private final long id; 27 + private final int id;
28 28
29 /** 29 /**
30 * Creates a new partition ID. 30 * Creates a new partition ID.
31 * 31 *
32 * @param id the partition ID 32 * @param id the partition ID
33 */ 33 */
34 - PartitionId(long id) { 34 + PartitionId(int id) {
35 this.id = id; 35 this.id = id;
36 } 36 }
37 37
38 + /**
39 + * Returns the integer ID value.
40 + *
41 + * @return ID value
42 + */
43 + public int value() {
44 + return id;
45 + }
46 +
38 @Override 47 @Override
39 public boolean equals(Object o) { 48 public boolean equals(Object o) {
40 if (!(o instanceof PartitionId)) { 49 if (!(o instanceof PartitionId)) {
......
...@@ -21,7 +21,10 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -21,7 +21,10 @@ import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference; 21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality; 22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
24 +import org.onosproject.cluster.ClusterEvent;
25 +import org.onosproject.cluster.ClusterEventListener;
24 import org.onosproject.cluster.ClusterService; 26 import org.onosproject.cluster.ClusterService;
27 +import org.onosproject.cluster.ControllerNode;
25 import org.onosproject.cluster.Leadership; 28 import org.onosproject.cluster.Leadership;
26 import org.onosproject.cluster.LeadershipEvent; 29 import org.onosproject.cluster.LeadershipEvent;
27 import org.onosproject.cluster.LeadershipEventListener; 30 import org.onosproject.cluster.LeadershipEventListener;
...@@ -31,8 +34,12 @@ import org.slf4j.Logger; ...@@ -31,8 +34,12 @@ import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory; 34 import org.slf4j.LoggerFactory;
32 35
33 import java.util.Collections; 36 import java.util.Collections;
37 +import java.util.Iterator;
34 import java.util.Set; 38 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap; 39 import java.util.concurrent.ConcurrentHashMap;
40 +import java.util.concurrent.Executors;
41 +import java.util.concurrent.ScheduledExecutorService;
42 +import java.util.concurrent.TimeUnit;
36 43
37 /** 44 /**
38 * Manages the assignment of intent keyspace partitions to instances. 45 * Manages the assignment of intent keyspace partitions to instances.
...@@ -49,35 +56,48 @@ public class PartitionManager implements PartitionService { ...@@ -49,35 +56,48 @@ public class PartitionManager implements PartitionService {
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 56 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 protected ClusterService clusterService; 57 protected ClusterService clusterService;
51 58
52 - // TODO make configurable 59 + private static final int NUM_PARTITIONS = 32;
53 - private static final int NUM_PARTITIONS = 100; 60 + private static final int BACKOFF_TIME = 2;
61 + private static final int CHECK_PERIOD = 10;
54 62
55 private static final String ELECTION_PREFIX = "intent-partition-"; 63 private static final String ELECTION_PREFIX = "intent-partition-";
56 64
57 private LeadershipEventListener leaderListener = new InternalLeadershipListener(); 65 private LeadershipEventListener leaderListener = new InternalLeadershipListener();
66 + private ClusterEventListener clusterListener = new InternalClusterEventListener();
58 67
59 - private Set<PartitionId> myPartitions; 68 + private final Set<PartitionId> myPartitions
69 + = Collections.newSetFromMap(new ConcurrentHashMap<>());
70 +
71 + private ScheduledExecutorService executor = Executors
72 + .newScheduledThreadPool(1);
60 73
61 @Activate 74 @Activate
62 public void activate() { 75 public void activate() {
63 - myPartitions = Collections.newSetFromMap(new ConcurrentHashMap<>());
64 -
65 leadershipService.addListener(leaderListener); 76 leadershipService.addListener(leaderListener);
77 + clusterService.addListener(clusterListener);
66 78
67 for (int i = 0; i < NUM_PARTITIONS; i++) { 79 for (int i = 0; i < NUM_PARTITIONS; i++) {
68 - leadershipService.runForLeadership(ELECTION_PREFIX + i); 80 + leadershipService.runForLeadership(getPartitionPath(i));
69 } 81 }
82 +
83 + executor.scheduleAtFixedRate(this::doRelinquish, 0,
84 + CHECK_PERIOD, TimeUnit.SECONDS);
70 } 85 }
71 86
72 @Deactivate 87 @Deactivate
73 public void deactivate() { 88 public void deactivate() {
74 leadershipService.removeListener(leaderListener); 89 leadershipService.removeListener(leaderListener);
90 + clusterService.removeListener(clusterListener);
91 + }
92 +
93 + private String getPartitionPath(int i) {
94 + return ELECTION_PREFIX + i;
75 } 95 }
76 96
77 private PartitionId getPartitionForKey(Key intentKey) { 97 private PartitionId getPartitionForKey(Key intentKey) {
78 log.debug("Getting partition for {}: {}", intentKey, 98 log.debug("Getting partition for {}: {}", intentKey,
79 - new PartitionId(Math.abs(intentKey.hash()) % NUM_PARTITIONS)); 99 + new PartitionId((int) Math.abs(intentKey.hash()) % NUM_PARTITIONS));
80 - return new PartitionId(Math.abs(intentKey.hash()) % NUM_PARTITIONS); 100 + return new PartitionId((int) Math.abs(intentKey.hash()) % NUM_PARTITIONS);
81 } 101 }
82 102
83 @Override 103 @Override
...@@ -85,6 +105,58 @@ public class PartitionManager implements PartitionService { ...@@ -85,6 +105,58 @@ public class PartitionManager implements PartitionService {
85 return myPartitions.contains(getPartitionForKey(intentKey)); 105 return myPartitions.contains(getPartitionForKey(intentKey));
86 } 106 }
87 107
108 + private void doRelinquish() {
109 + try {
110 + relinquish();
111 + } catch (Exception e) {
112 + log.warn("Exception caught during relinquish task", e);
113 + }
114 + }
115 +
116 +
117 + /**
118 + * Determine whether we have more than our fair share of partitions, and if
119 + * so, relinquish leadership of some of them for a little while to let
120 + * other instances take over.
121 + */
122 + private void relinquish() {
123 + int activeNodes = (int) clusterService.getNodes()
124 + .stream()
125 + .filter(n -> clusterService.getState(n.id())
126 + == ControllerNode.State.ACTIVE)
127 + .count();
128 +
129 + int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
130 +
131 + synchronized (myPartitions) {
132 + int relinquish = myPartitions.size() - myShare;
133 +
134 + if (relinquish <= 0) {
135 + return;
136 + }
137 +
138 + Iterator<PartitionId> it = myPartitions.iterator();
139 + for (int i = 0; i < relinquish; i++) {
140 + PartitionId id = it.next();
141 + it.remove();
142 +
143 + leadershipService.withdraw(getPartitionPath(id.value()));
144 +
145 + executor.schedule(() -> recontest(getPartitionPath(id.value())),
146 + BACKOFF_TIME, TimeUnit.SECONDS);
147 + }
148 + }
149 + }
150 +
151 + /**
152 + * Try and recontest for leadership of a partition.
153 + *
154 + * @param path topic name to recontest
155 + */
156 + private void recontest(String path) {
157 + leadershipService.runForLeadership(path);
158 + }
159 +
88 private final class InternalLeadershipListener implements LeadershipEventListener { 160 private final class InternalLeadershipListener implements LeadershipEventListener {
89 161
90 @Override 162 @Override
...@@ -109,12 +181,26 @@ public class PartitionManager implements PartitionService { ...@@ -109,12 +181,26 @@ public class PartitionManager implements PartitionService {
109 return; 181 return;
110 } 182 }
111 183
112 - if (event.type() == LeadershipEvent.Type.LEADER_ELECTED) { 184 + synchronized (myPartitions) {
113 - myPartitions.add(new PartitionId(partitionId)); 185 + if (event.type() == LeadershipEvent.Type.LEADER_ELECTED) {
114 - } else if (event.type() == LeadershipEvent.Type.LEADER_BOOTED) { 186 + myPartitions.add(new PartitionId(partitionId));
115 - myPartitions.remove(new PartitionId(partitionId)); 187 + } else if (event.type() == LeadershipEvent.Type.LEADER_BOOTED) {
188 + myPartitions.remove(new PartitionId(partitionId));
189 + }
116 } 190 }
191 +
192 + // See if we need to let some partitions go
193 + relinquish();
117 } 194 }
118 } 195 }
119 } 196 }
197 +
198 + private final class InternalClusterEventListener implements
199 + ClusterEventListener {
200 +
201 + @Override
202 + public void event(ClusterEvent event) {
203 + relinquish();
204 + }
205 + }
120 } 206 }
......