ONOS-1981: Move partition manager rebalance activity off of the event loop thread
Change-Id: I32241a53be683dbf2611069072f80269655baba8
Showing
2 changed files
with
55 additions
and
25 deletions
... | @@ -40,6 +40,7 @@ import java.util.Objects; | ... | @@ -40,6 +40,7 @@ import java.util.Objects; |
40 | import java.util.concurrent.Executors; | 40 | import java.util.concurrent.Executors; |
41 | import java.util.concurrent.ScheduledExecutorService; | 41 | import java.util.concurrent.ScheduledExecutorService; |
42 | import java.util.concurrent.TimeUnit; | 42 | import java.util.concurrent.TimeUnit; |
43 | +import java.util.concurrent.atomic.AtomicBoolean; | ||
43 | import java.util.stream.Collectors; | 44 | import java.util.stream.Collectors; |
44 | 45 | ||
45 | /** | 46 | /** |
... | @@ -57,9 +58,12 @@ public class PartitionManager implements PartitionService { | ... | @@ -57,9 +58,12 @@ public class PartitionManager implements PartitionService { |
57 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 58 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
58 | protected ClusterService clusterService; | 59 | protected ClusterService clusterService; |
59 | 60 | ||
61 | + protected final AtomicBoolean rebalanceScheduled = new AtomicBoolean(false); | ||
62 | + | ||
60 | static final int NUM_PARTITIONS = 14; | 63 | static final int NUM_PARTITIONS = 14; |
61 | private static final int BACKOFF_TIME = 2; | 64 | private static final int BACKOFF_TIME = 2; |
62 | - private static final int CHECK_PERIOD = 10; | 65 | + private static final int CHECK_PARTITION_BALANCE_PERIOD_SEC = 10; |
66 | + private static final int RETRY_AFTER_DELAY_SEC = 5; | ||
63 | 67 | ||
64 | private static final String ELECTION_PREFIX = "intent-partition-"; | 68 | private static final String ELECTION_PREFIX = "intent-partition-"; |
65 | 69 | ||
... | @@ -78,8 +82,8 @@ public class PartitionManager implements PartitionService { | ... | @@ -78,8 +82,8 @@ public class PartitionManager implements PartitionService { |
78 | leadershipService.runForLeadership(getPartitionPath(i)); | 82 | leadershipService.runForLeadership(getPartitionPath(i)); |
79 | } | 83 | } |
80 | 84 | ||
81 | - executor.scheduleAtFixedRate(this::doRelinquish, 0, | 85 | + executor.scheduleAtFixedRate(() -> scheduleRebalance(0), 0, |
82 | - CHECK_PERIOD, TimeUnit.SECONDS); | 86 | + CHECK_PARTITION_BALANCE_PERIOD_SEC, TimeUnit.SECONDS); |
83 | } | 87 | } |
84 | 88 | ||
85 | @Deactivate | 89 | @Deactivate |
... | @@ -129,11 +133,13 @@ public class PartitionManager implements PartitionService { | ... | @@ -129,11 +133,13 @@ public class PartitionManager implements PartitionService { |
129 | return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))); | 133 | return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey))); |
130 | } | 134 | } |
131 | 135 | ||
132 | - private void doRelinquish() { | 136 | + protected void doRebalance() { |
137 | + rebalanceScheduled.set(false); | ||
133 | try { | 138 | try { |
134 | - relinquish(); | 139 | + rebalance(); |
135 | } catch (Exception e) { | 140 | } catch (Exception e) { |
136 | - log.warn("Exception caught during relinquish task", e); | 141 | + log.warn("Exception caught during rebalance task. Will retry in " + RETRY_AFTER_DELAY_SEC + " seconds", e); |
142 | + scheduleRebalance(RETRY_AFTER_DELAY_SEC); | ||
137 | } | 143 | } |
138 | } | 144 | } |
139 | 145 | ||
... | @@ -142,11 +148,10 @@ public class PartitionManager implements PartitionService { | ... | @@ -142,11 +148,10 @@ public class PartitionManager implements PartitionService { |
142 | * so, relinquish leadership of some of them for a little while to let | 148 | * so, relinquish leadership of some of them for a little while to let |
143 | * other instances take over. | 149 | * other instances take over. |
144 | */ | 150 | */ |
145 | - private void relinquish() { | 151 | + private void rebalance() { |
146 | int activeNodes = (int) clusterService.getNodes() | 152 | int activeNodes = (int) clusterService.getNodes() |
147 | .stream() | 153 | .stream() |
148 | - .filter(n -> clusterService.getState(n.id()) | 154 | + .filter(node -> ControllerNode.State.ACTIVE == clusterService.getState(node.id())) |
149 | - == ControllerNode.State.ACTIVE) | ||
150 | .count(); | 155 | .count(); |
151 | 156 | ||
152 | int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes); | 157 | int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes); |
... | @@ -172,6 +177,12 @@ public class PartitionManager implements PartitionService { | ... | @@ -172,6 +177,12 @@ public class PartitionManager implements PartitionService { |
172 | } | 177 | } |
173 | } | 178 | } |
174 | 179 | ||
180 | + private void scheduleRebalance(int afterDelaySec) { | ||
181 | + if (rebalanceScheduled.compareAndSet(false, true)) { | ||
182 | + executor.schedule(this::doRebalance, afterDelaySec, TimeUnit.SECONDS); | ||
183 | + } | ||
184 | + } | ||
185 | + | ||
175 | /** | 186 | /** |
176 | * Try and recontest for leadership of a partition. | 187 | * Try and recontest for leadership of a partition. |
177 | * | 188 | * |
... | @@ -191,7 +202,7 @@ public class PartitionManager implements PartitionService { | ... | @@ -191,7 +202,7 @@ public class PartitionManager implements PartitionService { |
191 | leadership.topic().startsWith(ELECTION_PREFIX)) { | 202 | leadership.topic().startsWith(ELECTION_PREFIX)) { |
192 | 203 | ||
193 | // See if we need to let some partitions go | 204 | // See if we need to let some partitions go |
194 | - relinquish(); | 205 | + scheduleRebalance(0); |
195 | } | 206 | } |
196 | } | 207 | } |
197 | } | 208 | } |
... | @@ -201,7 +212,7 @@ public class PartitionManager implements PartitionService { | ... | @@ -201,7 +212,7 @@ public class PartitionManager implements PartitionService { |
201 | 212 | ||
202 | @Override | 213 | @Override |
203 | public void event(ClusterEvent event) { | 214 | public void event(ClusterEvent event) { |
204 | - relinquish(); | 215 | + scheduleRebalance(0); |
205 | } | 216 | } |
206 | } | 217 | } |
207 | } | 218 | } | ... | ... |
... | @@ -172,41 +172,60 @@ public class PartitionManagerTest { | ... | @@ -172,41 +172,60 @@ public class PartitionManagerTest { |
172 | /** | 172 | /** |
173 | * Tests sending in LeadershipServiceEvents in the case when we have | 173 | * Tests sending in LeadershipServiceEvents in the case when we have |
174 | * too many partitions. The event will trigger the partition manager to | 174 | * too many partitions. The event will trigger the partition manager to |
175 | - * reassess how many partitions it has and relinquish some. | 175 | + * schedule a rebalancing activity. |
176 | */ | 176 | */ |
177 | @Test | 177 | @Test |
178 | - public void testRelinquish() { | 178 | + public void testRebalanceScheduling() { |
179 | // We have all the partitions so we'll need to relinquish some | 179 | // We have all the partitions so we'll need to relinquish some |
180 | setUpLeadershipService(PartitionManager.NUM_PARTITIONS); | 180 | setUpLeadershipService(PartitionManager.NUM_PARTITIONS); |
181 | 181 | ||
182 | - expect(leadershipService.withdraw(anyString())) | ||
183 | - .andReturn(CompletableFuture.completedFuture(null)) | ||
184 | - .times(7); | ||
185 | - | ||
186 | replay(leadershipService); | 182 | replay(leadershipService); |
187 | 183 | ||
188 | partitionManager.activate(); | 184 | partitionManager.activate(); |
189 | // Send in the event | 185 | // Send in the event |
190 | leaderListener.event(event); | 186 | leaderListener.event(event); |
191 | 187 | ||
188 | + assertTrue(partitionManager.rebalanceScheduled.get()); | ||
189 | + | ||
192 | verify(leadershipService); | 190 | verify(leadershipService); |
193 | } | 191 | } |
194 | 192 | ||
195 | /** | 193 | /** |
196 | - * Tests sending in LeadershipServiceEvents in the case when we have the | 194 | + * Tests rebalance will trigger the right now of leadership withdraw calls. |
197 | - * right amount or too many partitions. These events will not trigger any | ||
198 | - * partition reassignments. | ||
199 | */ | 195 | */ |
200 | @Test | 196 | @Test |
201 | - public void testNoRelinquish() { | 197 | + public void testRebalance() { |
198 | + // We have all the partitions so we'll need to relinquish some | ||
199 | + setUpLeadershipService(PartitionManager.NUM_PARTITIONS); | ||
200 | + | ||
201 | + expect(leadershipService.withdraw(anyString())) | ||
202 | + .andReturn(CompletableFuture.completedFuture(null)) | ||
203 | + .times(7); | ||
204 | + | ||
205 | + replay(leadershipService); | ||
206 | + | ||
207 | + partitionManager.activate(); | ||
208 | + | ||
209 | + // trigger rebalance | ||
210 | + partitionManager.doRebalance(); | ||
211 | + | ||
212 | + verify(leadershipService); | ||
213 | + } | ||
214 | + | ||
215 | + /** | ||
216 | + * Tests that attempts to rebalance when the paritions are already | ||
217 | + * evenly distributed does not result in any relinquish attempts. | ||
218 | + */ | ||
219 | + @Test | ||
220 | + public void testNoRebalance() { | ||
202 | // Partitions are already perfectly balanced among the two active instances | 221 | // Partitions are already perfectly balanced among the two active instances |
203 | setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2); | 222 | setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2); |
204 | replay(leadershipService); | 223 | replay(leadershipService); |
205 | 224 | ||
206 | partitionManager.activate(); | 225 | partitionManager.activate(); |
207 | 226 | ||
208 | - // Send in the event | 227 | + // trigger rebalance |
209 | - leaderListener.event(event); | 228 | + partitionManager.doRebalance(); |
210 | 229 | ||
211 | verify(leadershipService); | 230 | verify(leadershipService); |
212 | 231 | ||
... | @@ -215,8 +234,8 @@ public class PartitionManagerTest { | ... | @@ -215,8 +234,8 @@ public class PartitionManagerTest { |
215 | setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2 - 1); | 234 | setUpLeadershipService(PartitionManager.NUM_PARTITIONS / 2 - 1); |
216 | replay(leadershipService); | 235 | replay(leadershipService); |
217 | 236 | ||
218 | - // Send in the event | 237 | + // trigger rebalance |
219 | - leaderListener.event(event); | 238 | + partitionManager.doRebalance(); |
220 | 239 | ||
221 | verify(leadershipService); | 240 | verify(leadershipService); |
222 | } | 241 | } | ... | ... |
-
Please register or login to post a comment