Jonathan Hart
Committed by Gerrit Code Review

PartitionManager: Don't try and store election state separately

Change-Id: Ie3733c6caae2e1d68108a6bb1d44bb784f5fedc7
...@@ -33,13 +33,11 @@ import org.onosproject.net.intent.Key; ...@@ -33,13 +33,11 @@ import org.onosproject.net.intent.Key;
33 import org.slf4j.Logger; 33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory; 34 import org.slf4j.LoggerFactory;
35 35
36 -import java.util.Collections; 36 +import java.util.List;
37 -import java.util.Iterator;
38 -import java.util.Set;
39 -import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.Executors; 37 import java.util.concurrent.Executors;
41 import java.util.concurrent.ScheduledExecutorService; 38 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.concurrent.TimeUnit; 39 import java.util.concurrent.TimeUnit;
40 +import java.util.stream.Collectors;
43 41
44 /** 42 /**
45 * Manages the assignment of intent keyspace partitions to instances. 43 * Manages the assignment of intent keyspace partitions to instances.
...@@ -65,9 +63,6 @@ public class PartitionManager implements PartitionService { ...@@ -65,9 +63,6 @@ public class PartitionManager implements PartitionService {
65 private LeadershipEventListener leaderListener = new InternalLeadershipListener(); 63 private LeadershipEventListener leaderListener = new InternalLeadershipListener();
66 private ClusterEventListener clusterListener = new InternalClusterEventListener(); 64 private ClusterEventListener clusterListener = new InternalClusterEventListener();
67 65
68 - private final Set<PartitionId> myPartitions
69 - = Collections.newSetFromMap(new ConcurrentHashMap<>());
70 -
71 private ScheduledExecutorService executor = Executors 66 private ScheduledExecutorService executor = Executors
72 .newScheduledThreadPool(1); 67 .newScheduledThreadPool(1);
73 68
...@@ -96,6 +91,10 @@ public class PartitionManager implements PartitionService { ...@@ -96,6 +91,10 @@ public class PartitionManager implements PartitionService {
96 return ELECTION_PREFIX + i; 91 return ELECTION_PREFIX + i;
97 } 92 }
98 93
94 + private String getPartitionPath(PartitionId id) {
95 + return getPartitionPath(id.value());
96 + }
97 +
99 private PartitionId getPartitionForKey(Key intentKey) { 98 private PartitionId getPartitionForKey(Key intentKey) {
100 int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS; 99 int partition = Math.abs((int) intentKey.hash()) % NUM_PARTITIONS;
101 //TODO investigate Guava consistent hash method 100 //TODO investigate Guava consistent hash method
...@@ -108,7 +107,8 @@ public class PartitionManager implements PartitionService { ...@@ -108,7 +107,8 @@ public class PartitionManager implements PartitionService {
108 107
109 @Override 108 @Override
110 public boolean isMine(Key intentKey) { 109 public boolean isMine(Key intentKey) {
111 - return myPartitions.contains(getPartitionForKey(intentKey)); 110 + return leadershipService.getLeader(getPartitionPath(getPartitionForKey(intentKey)))
111 + .equals(clusterService.getLocalNode().id());
112 } 112 }
113 113
114 private void doRelinquish() { 114 private void doRelinquish() {
...@@ -119,7 +119,6 @@ public class PartitionManager implements PartitionService { ...@@ -119,7 +119,6 @@ public class PartitionManager implements PartitionService {
119 } 119 }
120 } 120 }
121 121
122 -
123 /** 122 /**
124 * Determine whether we have more than our fair share of partitions, and if 123 * Determine whether we have more than our fair share of partitions, and if
125 * so, relinquish leadership of some of them for a little while to let 124 * so, relinquish leadership of some of them for a little while to let
...@@ -134,25 +133,26 @@ public class PartitionManager implements PartitionService { ...@@ -134,25 +133,26 @@ public class PartitionManager implements PartitionService {
134 133
135 int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes); 134 int myShare = (int) Math.ceil((double) NUM_PARTITIONS / activeNodes);
136 135
137 - synchronized (myPartitions) { 136 + List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
137 + .stream()
138 + .filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
139 + .filter(l -> l.topic().startsWith(ELECTION_PREFIX))
140 + .collect(Collectors.toList());
141 +
138 int relinquish = myPartitions.size() - myShare; 142 int relinquish = myPartitions.size() - myShare;
139 143
140 if (relinquish <= 0) { 144 if (relinquish <= 0) {
141 return; 145 return;
142 } 146 }
143 147
144 - Iterator<PartitionId> it = myPartitions.iterator();
145 for (int i = 0; i < relinquish; i++) { 148 for (int i = 0; i < relinquish; i++) {
146 - PartitionId id = it.next(); 149 + String topic = myPartitions.get(i).topic();
147 - it.remove(); 150 + leadershipService.withdraw(topic);
148 -
149 - leadershipService.withdraw(getPartitionPath(id.value()));
150 151
151 - executor.schedule(() -> recontest(getPartitionPath(id.value())), 152 + executor.schedule(() -> recontest(topic),
152 BACKOFF_TIME, TimeUnit.SECONDS); 153 BACKOFF_TIME, TimeUnit.SECONDS);
153 } 154 }
154 } 155 }
155 - }
156 156
157 /** 157 /**
158 * Try and recontest for leadership of a partition. 158 * Try and recontest for leadership of a partition.
...@@ -168,33 +168,10 @@ public class PartitionManager implements PartitionService { ...@@ -168,33 +168,10 @@ public class PartitionManager implements PartitionService {
168 @Override 168 @Override
169 public void event(LeadershipEvent event) { 169 public void event(LeadershipEvent event) {
170 Leadership leadership = event.subject(); 170 Leadership leadership = event.subject();
171 - // update internal state about which partitions I'm leader of 171 +
172 if (leadership.leader().equals(clusterService.getLocalNode().id()) && 172 if (leadership.leader().equals(clusterService.getLocalNode().id()) &&
173 leadership.topic().startsWith(ELECTION_PREFIX)) { 173 leadership.topic().startsWith(ELECTION_PREFIX)) {
174 174
175 - // Parse out the partition ID
176 - String[] splitted = leadership.topic().split("-");
177 - if (splitted.length != 3) {
178 - log.warn("Couldn't parse leader election topic {}", leadership.topic());
179 - return;
180 - }
181 -
182 - int partitionId;
183 - try {
184 - partitionId = Integer.parseInt(splitted[2]);
185 - } catch (NumberFormatException e) {
186 - log.warn("Couldn't parse partition ID {}", splitted[2]);
187 - return;
188 - }
189 -
190 - synchronized (myPartitions) {
191 - if (event.type() == LeadershipEvent.Type.LEADER_ELECTED) {
192 - myPartitions.add(new PartitionId(partitionId));
193 - } else if (event.type() == LeadershipEvent.Type.LEADER_BOOTED) {
194 - myPartitions.remove(new PartitionId(partitionId));
195 - }
196 - }
197 -
198 // See if we need to let some partitions go 175 // See if we need to let some partitions go
199 relinquish(); 176 relinquish();
200 } 177 }
......