Fix Mastership timstamping issue
Change-Id: I80cfa71f844ddf7d64dbff0458db193734a3f737
Showing
3 changed files
with
45 additions
and
7 deletions
... | @@ -12,6 +12,7 @@ public interface ClockProviderService { | ... | @@ -12,6 +12,7 @@ public interface ClockProviderService { |
12 | 12 | ||
13 | /** | 13 | /** |
14 | * Updates the mastership term for the specified deviceId. | 14 | * Updates the mastership term for the specified deviceId. |
15 | + * | ||
15 | * @param deviceId device identifier. | 16 | * @param deviceId device identifier. |
16 | * @param term mastership term. | 17 | * @param term mastership term. |
17 | */ | 18 | */ | ... | ... |
... | @@ -144,6 +144,10 @@ public class DeviceManager | ... | @@ -144,6 +144,10 @@ public class DeviceManager |
144 | private void applyRole(DeviceId deviceId, MastershipRole newRole) { | 144 | private void applyRole(DeviceId deviceId, MastershipRole newRole) { |
145 | if (newRole != MastershipRole.NONE) { | 145 | if (newRole != MastershipRole.NONE) { |
146 | Device device = store.getDevice(deviceId); | 146 | Device device = store.getDevice(deviceId); |
147 | + // FIXME: Device might not be there yet. (eventual consistent) | ||
148 | + if (device == null) { | ||
149 | + return; | ||
150 | + } | ||
147 | DeviceProvider provider = getProvider(device.providerId()); | 151 | DeviceProvider provider = getProvider(device.providerId()); |
148 | if (provider != null) { | 152 | if (provider != null) { |
149 | provider.roleChanged(device, newRole); | 153 | provider.roleChanged(device, newRole); |
... | @@ -193,16 +197,38 @@ public class DeviceManager | ... | @@ -193,16 +197,38 @@ public class DeviceManager |
193 | checkNotNull(deviceId, DEVICE_ID_NULL); | 197 | checkNotNull(deviceId, DEVICE_ID_NULL); |
194 | checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL); | 198 | checkNotNull(deviceDescription, DEVICE_DESCRIPTION_NULL); |
195 | checkValidity(); | 199 | checkValidity(); |
200 | + | ||
201 | + log.info("Device {} connected", deviceId); | ||
202 | + // check my Role | ||
203 | + MastershipRole role = mastershipService.requestRoleFor(deviceId); | ||
204 | + | ||
205 | + if (role != MastershipRole.MASTER) { | ||
206 | + // TODO: Do we need to tell the Provider that | ||
207 | + // I am no longer the MASTER? | ||
208 | + return; | ||
209 | + } | ||
210 | + | ||
211 | + // Master: | ||
212 | + MastershipTerm term = mastershipService.requestTermService() | ||
213 | + .getMastershipTerm(deviceId); | ||
214 | + if (!term.master().equals(clusterService.getLocalNode().id())) { | ||
215 | + // I've lost mastership after I thought I was MASTER. | ||
216 | + return; | ||
217 | + } | ||
218 | + clockProviderService.setMastershipTerm(deviceId, term); | ||
219 | + | ||
196 | DeviceEvent event = store.createOrUpdateDevice(provider().id(), | 220 | DeviceEvent event = store.createOrUpdateDevice(provider().id(), |
197 | deviceId, deviceDescription); | 221 | deviceId, deviceDescription); |
198 | 222 | ||
199 | - // If there was a change of any kind, trigger role selection | 223 | + // If there was a change of any kind, tell the provider |
200 | - // process. | 224 | + // I am the master. |
225 | + // Note: can be null, if mastership was lost. | ||
201 | if (event != null) { | 226 | if (event != null) { |
202 | - log.info("Device {} connected", deviceId); | 227 | + // TODO: Check switch reconnected case, is it assured that |
203 | - mastershipService.requestRoleFor(deviceId); | 228 | + // event will not be null? |
204 | - provider().roleChanged(event.subject(), | 229 | + |
205 | - mastershipService.requestRoleFor(deviceId)); | 230 | + // FIXME: 1st argument should be deviceId |
231 | + provider().roleChanged(event.subject(), role); | ||
206 | post(event); | 232 | post(event); |
207 | } | 233 | } |
208 | } | 234 | } |
... | @@ -211,6 +237,10 @@ public class DeviceManager | ... | @@ -211,6 +237,10 @@ public class DeviceManager |
211 | public void deviceDisconnected(DeviceId deviceId) { | 237 | public void deviceDisconnected(DeviceId deviceId) { |
212 | checkNotNull(deviceId, DEVICE_ID_NULL); | 238 | checkNotNull(deviceId, DEVICE_ID_NULL); |
213 | checkValidity(); | 239 | checkValidity(); |
240 | + if (!mastershipService.getLocalRole(deviceId).equals(MastershipRole.MASTER)) { | ||
241 | + log.debug("Device {} disconnected, but I am not the master", deviceId); | ||
242 | + return; | ||
243 | + } | ||
214 | DeviceEvent event = store.markOffline(deviceId); | 244 | DeviceEvent event = store.markOffline(deviceId); |
215 | 245 | ||
216 | //we're no longer capable of mastership. | 246 | //we're no longer capable of mastership. |
... | @@ -272,9 +302,15 @@ public class DeviceManager | ... | @@ -272,9 +302,15 @@ public class DeviceManager |
272 | @Override | 302 | @Override |
273 | public void event(MastershipEvent event) { | 303 | public void event(MastershipEvent event) { |
274 | if (event.master().equals(clusterService.getLocalNode().id())) { | 304 | if (event.master().equals(clusterService.getLocalNode().id())) { |
305 | + | ||
275 | MastershipTerm term = mastershipService.requestTermService() | 306 | MastershipTerm term = mastershipService.requestTermService() |
276 | .getMastershipTerm(event.subject()); | 307 | .getMastershipTerm(event.subject()); |
277 | - clockProviderService.setMastershipTerm(event.subject(), term); | 308 | + |
309 | + if (term.master().equals(clusterService.getLocalNode().id())) { | ||
310 | + // only set if I am the master | ||
311 | + clockProviderService.setMastershipTerm(event.subject(), term); | ||
312 | + } | ||
313 | + | ||
278 | applyRole(event.subject(), MastershipRole.MASTER); | 314 | applyRole(event.subject(), MastershipRole.MASTER); |
279 | } else { | 315 | } else { |
280 | applyRole(event.subject(), MastershipRole.STANDBY); | 316 | applyRole(event.subject(), MastershipRole.STANDBY); | ... | ... |
... | @@ -113,6 +113,7 @@ public class GossipDeviceStore | ... | @@ -113,6 +113,7 @@ public class GossipDeviceStore |
113 | protected ClusterService clusterService; | 113 | protected ClusterService clusterService; |
114 | 114 | ||
115 | private static final KryoSerializer SERIALIZER = new KryoSerializer() { | 115 | private static final KryoSerializer SERIALIZER = new KryoSerializer() { |
116 | + @Override | ||
116 | protected void setupKryoPool() { | 117 | protected void setupKryoPool() { |
117 | serializerPool = KryoPool.newBuilder() | 118 | serializerPool = KryoPool.newBuilder() |
118 | .register(KryoPoolUtil.API) | 119 | .register(KryoPoolUtil.API) | ... | ... |
-
Please register or login to post a comment