DeviceManager: background process checking mastership
Change-Id: I215a2f6b585077847821b9e36953c53e43fde6c3
Showing
1 changed file
with
146 additions
and
82 deletions
... | @@ -18,9 +18,13 @@ package org.onlab.onos.net.device.impl; | ... | @@ -18,9 +18,13 @@ package org.onlab.onos.net.device.impl; |
18 | import static com.google.common.base.Preconditions.checkNotNull; | 18 | import static com.google.common.base.Preconditions.checkNotNull; |
19 | import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED; | 19 | import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED; |
20 | import static org.onlab.onos.net.MastershipRole.*; | 20 | import static org.onlab.onos.net.MastershipRole.*; |
21 | +import static org.onlab.util.Tools.namedThreads; | ||
21 | import static org.slf4j.LoggerFactory.getLogger; | 22 | import static org.slf4j.LoggerFactory.getLogger; |
22 | 23 | ||
23 | import java.util.List; | 24 | import java.util.List; |
25 | +import java.util.concurrent.Executors; | ||
26 | +import java.util.concurrent.ScheduledExecutorService; | ||
27 | +import java.util.concurrent.TimeUnit; | ||
24 | 28 | ||
25 | import org.apache.felix.scr.annotations.Activate; | 29 | import org.apache.felix.scr.annotations.Activate; |
26 | import org.apache.felix.scr.annotations.Component; | 30 | import org.apache.felix.scr.annotations.Component; |
... | @@ -83,6 +87,8 @@ public class DeviceManager | ... | @@ -83,6 +87,8 @@ public class DeviceManager |
83 | 87 | ||
84 | private final MastershipListener mastershipListener = new InternalMastershipListener(); | 88 | private final MastershipListener mastershipListener = new InternalMastershipListener(); |
85 | 89 | ||
90 | + private ScheduledExecutorService backgroundService; | ||
91 | + | ||
86 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | 92 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
87 | protected DeviceStore store; | 93 | protected DeviceStore store; |
88 | 94 | ||
... | @@ -102,15 +108,31 @@ public class DeviceManager | ... | @@ -102,15 +108,31 @@ public class DeviceManager |
102 | 108 | ||
103 | @Activate | 109 | @Activate |
104 | public void activate() { | 110 | public void activate() { |
111 | + backgroundService = Executors.newSingleThreadScheduledExecutor(namedThreads("device-manager-background")); | ||
112 | + | ||
105 | store.setDelegate(delegate); | 113 | store.setDelegate(delegate); |
106 | eventDispatcher.addSink(DeviceEvent.class, listenerRegistry); | 114 | eventDispatcher.addSink(DeviceEvent.class, listenerRegistry); |
107 | mastershipService.addListener(mastershipListener); | 115 | mastershipService.addListener(mastershipListener); |
108 | termService = mastershipService.requestTermService(); | 116 | termService = mastershipService.requestTermService(); |
117 | + | ||
118 | + backgroundService.scheduleWithFixedDelay(new Runnable() { | ||
119 | + | ||
120 | + @Override | ||
121 | + public void run() { | ||
122 | + try { | ||
123 | + mastershipCheck(); | ||
124 | + } catch (Exception e) { | ||
125 | + log.error("Exception thrown during integrity check", e); | ||
126 | + } | ||
127 | + } | ||
128 | + }, 1, 1, TimeUnit.MINUTES); | ||
109 | log.info("Started"); | 129 | log.info("Started"); |
110 | } | 130 | } |
111 | 131 | ||
112 | @Deactivate | 132 | @Deactivate |
113 | public void deactivate() { | 133 | public void deactivate() { |
134 | + backgroundService.shutdown(); | ||
135 | + | ||
114 | store.unsetDelegate(delegate); | 136 | store.unsetDelegate(delegate); |
115 | mastershipService.removeListener(mastershipListener); | 137 | mastershipService.removeListener(mastershipListener); |
116 | eventDispatcher.removeSink(DeviceEvent.class); | 138 | eventDispatcher.removeSink(DeviceEvent.class); |
... | @@ -172,10 +194,6 @@ public class DeviceManager | ... | @@ -172,10 +194,6 @@ public class DeviceManager |
172 | @Override | 194 | @Override |
173 | public void removeDevice(DeviceId deviceId) { | 195 | public void removeDevice(DeviceId deviceId) { |
174 | checkNotNull(deviceId, DEVICE_ID_NULL); | 196 | checkNotNull(deviceId, DEVICE_ID_NULL); |
175 | - // XXX is this intended to apply to the full global topology? | ||
176 | - // if so, we probably don't want the fact that we aren't | ||
177 | - // MASTER to get in the way, as it would do now. | ||
178 | - // FIXME: forward or broadcast and let the Master handler the event. | ||
179 | DeviceEvent event = store.removeDevice(deviceId); | 197 | DeviceEvent event = store.removeDevice(deviceId); |
180 | if (event != null) { | 198 | if (event != null) { |
181 | log.info("Device {} administratively removed", deviceId); | 199 | log.info("Device {} administratively removed", deviceId); |
... | @@ -199,6 +217,31 @@ public class DeviceManager | ... | @@ -199,6 +217,31 @@ public class DeviceManager |
199 | return new InternalDeviceProviderService(provider); | 217 | return new InternalDeviceProviderService(provider); |
200 | } | 218 | } |
201 | 219 | ||
220 | + /** | ||
221 | + * Checks if all the reachable devices have a valid mastership role. | ||
222 | + */ | ||
223 | + private void mastershipCheck() { | ||
224 | + log.debug("Checking mastership"); | ||
225 | + for (Device device : getDevices()) { | ||
226 | + final DeviceId deviceId = device.id(); | ||
227 | + log.debug("Checking device {}", deviceId); | ||
228 | + | ||
229 | + if (!isReachable(deviceId)) { | ||
230 | + continue; | ||
231 | + } | ||
232 | + | ||
233 | + if (mastershipService.getLocalRole(deviceId) != NONE) { | ||
234 | + continue; | ||
235 | + } | ||
236 | + | ||
237 | + log.info("{} is reachable but did not have a valid role, reasserting", deviceId); | ||
238 | + | ||
239 | + // isReachable but was not MASTER or STANDBY, get a role and apply | ||
240 | + // Note: NONE triggers request to MastershipService | ||
241 | + reassertRole(deviceId, NONE); | ||
242 | + } | ||
243 | + } | ||
244 | + | ||
202 | // Personalized device provider service issued to the supplied provider. | 245 | // Personalized device provider service issued to the supplied provider. |
203 | private class InternalDeviceProviderService | 246 | private class InternalDeviceProviderService |
204 | extends AbstractProviderService<DeviceProvider> | 247 | extends AbstractProviderService<DeviceProvider> |
... | @@ -418,48 +461,112 @@ public class DeviceManager | ... | @@ -418,48 +461,112 @@ public class DeviceManager |
418 | } | 461 | } |
419 | } | 462 | } |
420 | 463 | ||
421 | - // Intercepts mastership events | 464 | + // Applies the specified role to the device; ignores NONE |
422 | - private class InternalMastershipListener implements MastershipListener { | 465 | + /** |
466 | + * Apply role to device and send probe if MASTER. | ||
467 | + * | ||
468 | + * @param deviceId device identifier | ||
469 | + * @param newRole new role to apply to the device | ||
470 | + * @return true if the request was sent to provider | ||
471 | + */ | ||
472 | + private boolean applyRoleAndProbe(DeviceId deviceId, MastershipRole newRole) { | ||
473 | + if (newRole.equals(MastershipRole.NONE)) { | ||
474 | + //no-op | ||
475 | + return true; | ||
476 | + } | ||
423 | 477 | ||
424 | - // Applies the specified role to the device; ignores NONE | 478 | + Device device = store.getDevice(deviceId); |
425 | - /** | 479 | + // FIXME: Device might not be there yet. (eventual consistent) |
426 | - * Apply role in reaction to mastership event. | 480 | + // FIXME relinquish role |
427 | - * | 481 | + if (device == null) { |
428 | - * @param deviceId device identifier | 482 | + log.warn("{} was not there. Cannot apply role {}", deviceId, newRole); |
429 | - * @param newRole new role to apply to the device | 483 | + return false; |
430 | - * @return true if the request was sent to provider | 484 | + } |
431 | - */ | ||
432 | - private boolean applyRole(DeviceId deviceId, MastershipRole newRole) { | ||
433 | - if (newRole.equals(MastershipRole.NONE)) { | ||
434 | - //no-op | ||
435 | - return true; | ||
436 | - } | ||
437 | 485 | ||
438 | - Device device = store.getDevice(deviceId); | 486 | + DeviceProvider provider = getProvider(device.providerId()); |
439 | - // FIXME: Device might not be there yet. (eventual consistent) | 487 | + if (provider == null) { |
440 | - // FIXME relinquish role | 488 | + log.warn("Provider for {} was not found. Cannot apply role {}", deviceId, newRole); |
441 | - if (device == null) { | 489 | + return false; |
442 | - log.warn("{} was not there. Cannot apply role {}", deviceId, newRole); | 490 | + } |
443 | - return false; | 491 | + provider.roleChanged(deviceId, newRole); |
444 | - } | ||
445 | 492 | ||
446 | - DeviceProvider provider = getProvider(device.providerId()); | 493 | + if (newRole.equals(MastershipRole.MASTER)) { |
447 | - if (provider == null) { | 494 | + // only trigger event when request was sent to provider |
448 | - log.warn("Provider for {} was not found. Cannot apply role {}", deviceId, newRole); | 495 | + // TODO: consider removing this from Device event type? |
449 | - return false; | 496 | + post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device)); |
450 | - } | ||
451 | - provider.roleChanged(deviceId, newRole); | ||
452 | 497 | ||
453 | - if (newRole.equals(MastershipRole.MASTER)) { | 498 | + provider.triggerProbe(device); |
454 | - // only trigger event when request was sent to provider | 499 | + } |
455 | - // TODO: consider removing this from Device event type? | 500 | + return true; |
456 | - post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device)); | 501 | + } |
457 | 502 | ||
458 | - provider.triggerProbe(device); | 503 | + /** |
504 | + * Reaasert role for specified device connected to this node. | ||
505 | + * | ||
506 | + * @param did device identifier | ||
507 | + * @param nextRole role to apply. If NONE is specified, | ||
508 | + * it will ask mastership service for a role and apply it. | ||
509 | + */ | ||
510 | + private void reassertRole(final DeviceId did, | ||
511 | + final MastershipRole nextRole) { | ||
512 | + | ||
513 | + final NodeId myNodeId = clusterService.getLocalNode().id(); | ||
514 | + MastershipRole myNextRole = nextRole; | ||
515 | + if (myNextRole == NONE) { | ||
516 | + mastershipService.requestRoleFor(did); | ||
517 | + MastershipTerm term = termService.getMastershipTerm(did); | ||
518 | + if (myNodeId.equals(term.master())) { | ||
519 | + myNextRole = MASTER; | ||
520 | + } else { | ||
521 | + myNextRole = STANDBY; | ||
459 | } | 522 | } |
460 | - return true; | ||
461 | } | 523 | } |
462 | 524 | ||
525 | + switch (myNextRole) { | ||
526 | + case MASTER: | ||
527 | + final Device device = getDevice(did); | ||
528 | + if ((device != null) && !isAvailable(did)) { | ||
529 | + //flag the device as online. Is there a better way to do this? | ||
530 | + DefaultDeviceDescription deviceDescription | ||
531 | + = new DefaultDeviceDescription(did.uri(), | ||
532 | + device.type(), | ||
533 | + device.manufacturer(), | ||
534 | + device.hwVersion(), | ||
535 | + device.swVersion(), | ||
536 | + device.serialNumber(), | ||
537 | + device.chassisId()); | ||
538 | + DeviceEvent devEvent = | ||
539 | + store.createOrUpdateDevice(device.providerId(), did, | ||
540 | + deviceDescription); | ||
541 | + post(devEvent); | ||
542 | + } | ||
543 | + // TODO: should apply role only if there is mismatch | ||
544 | + log.info("Applying role {} to {}", myNextRole, did); | ||
545 | + if (!applyRoleAndProbe(did, MASTER)) { | ||
546 | + // immediately failed to apply role | ||
547 | + mastershipService.relinquishMastership(did); | ||
548 | + // FIXME disconnect? | ||
549 | + } | ||
550 | + break; | ||
551 | + case STANDBY: | ||
552 | + log.info("Applying role {} to {}", myNextRole, did); | ||
553 | + if (!applyRoleAndProbe(did, STANDBY)) { | ||
554 | + // immediately failed to apply role | ||
555 | + mastershipService.relinquishMastership(did); | ||
556 | + // FIXME disconnect? | ||
557 | + } | ||
558 | + break; | ||
559 | + case NONE: | ||
560 | + default: | ||
561 | + // should never reach here | ||
562 | + log.error("You didn't see anything. I did not exist."); | ||
563 | + break; | ||
564 | + } | ||
565 | + } | ||
566 | + | ||
567 | + // Intercepts mastership events | ||
568 | + private class InternalMastershipListener implements MastershipListener { | ||
569 | + | ||
463 | @Override | 570 | @Override |
464 | public void event(MastershipEvent event) { | 571 | public void event(MastershipEvent event) { |
465 | 572 | ||
... | @@ -499,55 +606,12 @@ public class DeviceManager | ... | @@ -499,55 +606,12 @@ public class DeviceManager |
499 | + "Relinquishing role. ", | 606 | + "Relinquishing role. ", |
500 | myNextRole, did); | 607 | myNextRole, did); |
501 | mastershipService.relinquishMastership(did); | 608 | mastershipService.relinquishMastership(did); |
502 | - // FIXME disconnect? | ||
503 | } | 609 | } |
504 | return; | 610 | return; |
505 | } | 611 | } |
506 | 612 | ||
507 | // device is connected to this node: | 613 | // device is connected to this node: |
508 | - | 614 | + reassertRole(did, myNextRole); |
509 | - if (myNextRole == NONE) { | ||
510 | - mastershipService.requestRoleFor(did); | ||
511 | - MastershipTerm term = termService.getMastershipTerm(did); | ||
512 | - if (myNodeId.equals(term.master())) { | ||
513 | - myNextRole = MASTER; | ||
514 | - } else { | ||
515 | - myNextRole = STANDBY; | ||
516 | - } | ||
517 | - } | ||
518 | - | ||
519 | - switch (myNextRole) { | ||
520 | - case MASTER: | ||
521 | - final Device device = getDevice(did); | ||
522 | - if ((device != null) && !isAvailable(did)) { | ||
523 | - //flag the device as online. Is there a better way to do this? | ||
524 | - DefaultDeviceDescription deviceDescription | ||
525 | - = new DefaultDeviceDescription(did.uri(), | ||
526 | - device.type(), | ||
527 | - device.manufacturer(), | ||
528 | - device.hwVersion(), | ||
529 | - device.swVersion(), | ||
530 | - device.serialNumber(), | ||
531 | - device.chassisId()); | ||
532 | - DeviceEvent devEvent = | ||
533 | - store.createOrUpdateDevice(device.providerId(), did, | ||
534 | - deviceDescription); | ||
535 | - post(devEvent); | ||
536 | - } | ||
537 | - // TODO: should apply role only if there is mismatch | ||
538 | - log.info("Applying role {} to {}", myNextRole, did); | ||
539 | - applyRole(did, MASTER); | ||
540 | - break; | ||
541 | - case STANDBY: | ||
542 | - log.info("Applying role {} to {}", myNextRole, did); | ||
543 | - applyRole(did, STANDBY); | ||
544 | - break; | ||
545 | - case NONE: | ||
546 | - default: | ||
547 | - // should never reach here | ||
548 | - log.error("You didn't see anything. I did not exist."); | ||
549 | - break; | ||
550 | - } | ||
551 | } | 615 | } |
552 | } | 616 | } |
553 | 617 | ... | ... |
-
Please register or login to post a comment