Forward Device remove to MASTER
Change-Id: I5bf0fc76ffd04d5a1165d24e82eedd7993ec95be
Showing
2 changed files
with
59 additions
and
5 deletions
... | @@ -32,6 +32,8 @@ import org.onlab.onos.cluster.ClusterService; | ... | @@ -32,6 +32,8 @@ import org.onlab.onos.cluster.ClusterService; |
32 | import org.onlab.onos.cluster.ControllerNode; | 32 | import org.onlab.onos.cluster.ControllerNode; |
33 | import org.onlab.onos.cluster.NodeId; | 33 | import org.onlab.onos.cluster.NodeId; |
34 | import org.onlab.onos.mastership.MastershipService; | 34 | import org.onlab.onos.mastership.MastershipService; |
35 | +import org.onlab.onos.mastership.MastershipTerm; | ||
36 | +import org.onlab.onos.mastership.MastershipTermService; | ||
35 | import org.onlab.onos.net.AnnotationsUtil; | 37 | import org.onlab.onos.net.AnnotationsUtil; |
36 | import org.onlab.onos.net.DefaultAnnotations; | 38 | import org.onlab.onos.net.DefaultAnnotations; |
37 | import org.onlab.onos.net.DefaultDevice; | 39 | import org.onlab.onos.net.DefaultDevice; |
... | @@ -39,6 +41,7 @@ import org.onlab.onos.net.DefaultPort; | ... | @@ -39,6 +41,7 @@ import org.onlab.onos.net.DefaultPort; |
39 | import org.onlab.onos.net.Device; | 41 | import org.onlab.onos.net.Device; |
40 | import org.onlab.onos.net.Device.Type; | 42 | import org.onlab.onos.net.Device.Type; |
41 | import org.onlab.onos.net.DeviceId; | 43 | import org.onlab.onos.net.DeviceId; |
44 | +import org.onlab.onos.net.MastershipRole; | ||
42 | import org.onlab.onos.net.Port; | 45 | import org.onlab.onos.net.Port; |
43 | import org.onlab.onos.net.PortNumber; | 46 | import org.onlab.onos.net.PortNumber; |
44 | import org.onlab.onos.net.device.DeviceClockService; | 47 | import org.onlab.onos.net.device.DeviceClockService; |
... | @@ -89,6 +92,7 @@ import static com.google.common.base.Verify.verify; | ... | @@ -89,6 +92,7 @@ import static com.google.common.base.Verify.verify; |
89 | import static org.onlab.util.Tools.namedThreads; | 92 | import static org.onlab.util.Tools.namedThreads; |
90 | import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; | 93 | import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; |
91 | import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE; | 94 | import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE; |
95 | +import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ; | ||
92 | 96 | ||
93 | // TODO: give me a better name | 97 | // TODO: give me a better name |
94 | /** | 98 | /** |
... | @@ -160,6 +164,7 @@ public class GossipDeviceStore | ... | @@ -160,6 +164,7 @@ public class GossipDeviceStore |
160 | GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener()); | 164 | GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener()); |
161 | clusterCommunicator.addSubscriber( | 165 | clusterCommunicator.addSubscriber( |
162 | GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener()); | 166 | GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener()); |
167 | + clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener()); | ||
163 | clusterCommunicator.addSubscriber( | 168 | clusterCommunicator.addSubscriber( |
164 | GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener()); | 169 | GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener()); |
165 | clusterCommunicator.addSubscriber( | 170 | clusterCommunicator.addSubscriber( |
... | @@ -715,14 +720,48 @@ public class GossipDeviceStore | ... | @@ -715,14 +720,48 @@ public class GossipDeviceStore |
715 | 720 | ||
716 | @Override | 721 | @Override |
717 | public synchronized DeviceEvent removeDevice(DeviceId deviceId) { | 722 | public synchronized DeviceEvent removeDevice(DeviceId deviceId) { |
718 | - final NodeId master = mastershipService.getMasterFor(deviceId); | 723 | + final NodeId myId = clusterService.getLocalNode().id(); |
719 | - if (!clusterService.getLocalNode().id().equals(master)) { | 724 | + NodeId master = mastershipService.getMasterFor(deviceId); |
720 | - log.info("Removal of device {} requested on non master node", deviceId); | 725 | + |
721 | - // FIXME silently ignoring. Should be forwarding or broadcasting to | 726 | + // if there exist a master, forward |
722 | - // master. | 727 | + // if there is no master, try to become one and process |
728 | + | ||
729 | + boolean relinquishAtEnd = false; | ||
730 | + if (master == null) { | ||
731 | + final MastershipRole myRole = mastershipService.getLocalRole(deviceId); | ||
732 | + if (myRole != MastershipRole.NONE) { | ||
733 | + relinquishAtEnd = true; | ||
734 | + } | ||
735 | + log.info("Temporarlily requesting role for {} to remove", deviceId); | ||
736 | + mastershipService.requestRoleFor(deviceId); | ||
737 | + MastershipTermService termService = mastershipService.requestTermService(); | ||
738 | + MastershipTerm term = termService.getMastershipTerm(deviceId); | ||
739 | + if (myId.equals(term.master())) { | ||
740 | + master = myId; | ||
741 | + } | ||
742 | + } | ||
743 | + | ||
744 | + if (!myId.equals(master)) { | ||
745 | + log.info("{} has control of {}, forwarding remove request", | ||
746 | + master, deviceId); | ||
747 | + | ||
748 | + ClusterMessage message = new ClusterMessage( | ||
749 | + myId, | ||
750 | + DEVICE_REMOVE_REQ, | ||
751 | + SERIALIZER.encode(deviceId)); | ||
752 | + | ||
753 | + try { | ||
754 | + clusterCommunicator.unicast(message, master); | ||
755 | + } catch (IOException e) { | ||
756 | + log.error("Failed to forward {} remove request to {}", deviceId, master, e); | ||
757 | + } | ||
758 | + | ||
759 | + // event will be triggered after master processes it. | ||
723 | return null; | 760 | return null; |
724 | } | 761 | } |
725 | 762 | ||
763 | + // I have control.. | ||
764 | + | ||
726 | Timestamp timestamp = deviceClockService.getTimestamp(deviceId); | 765 | Timestamp timestamp = deviceClockService.getTimestamp(deviceId); |
727 | DeviceEvent event = removeDeviceInternal(deviceId, timestamp); | 766 | DeviceEvent event = removeDeviceInternal(deviceId, timestamp); |
728 | if (event != null) { | 767 | if (event != null) { |
... | @@ -735,6 +774,10 @@ public class GossipDeviceStore | ... | @@ -735,6 +774,10 @@ public class GossipDeviceStore |
735 | deviceId); | 774 | deviceId); |
736 | } | 775 | } |
737 | } | 776 | } |
777 | + if (relinquishAtEnd) { | ||
778 | + log.info("Relinquishing temporary role acquired for {}", deviceId); | ||
779 | + mastershipService.relinquishMastership(deviceId); | ||
780 | + } | ||
738 | return event; | 781 | return event; |
739 | } | 782 | } |
740 | 783 | ||
... | @@ -1241,6 +1284,16 @@ public class GossipDeviceStore | ... | @@ -1241,6 +1284,16 @@ public class GossipDeviceStore |
1241 | } | 1284 | } |
1242 | } | 1285 | } |
1243 | 1286 | ||
1287 | + private final class InternalRemoveRequestListener | ||
1288 | + implements ClusterMessageHandler { | ||
1289 | + @Override | ||
1290 | + public void handle(ClusterMessage message) { | ||
1291 | + log.debug("Received device remove request from peer: {}", message.sender()); | ||
1292 | + DeviceId did = SERIALIZER.decode(message.payload()); | ||
1293 | + removeDevice(did); | ||
1294 | + } | ||
1295 | + } | ||
1296 | + | ||
1244 | private class InternalDeviceRemovedEventListener implements ClusterMessageHandler { | 1297 | private class InternalDeviceRemovedEventListener implements ClusterMessageHandler { |
1245 | @Override | 1298 | @Override |
1246 | public void handle(ClusterMessage message) { | 1299 | public void handle(ClusterMessage message) { | ... | ... |
core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
... | @@ -27,6 +27,7 @@ public final class GossipDeviceStoreMessageSubjects { | ... | @@ -27,6 +27,7 @@ public final class GossipDeviceStoreMessageSubjects { |
27 | 27 | ||
28 | public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update"); | 28 | public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update"); |
29 | public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline"); | 29 | public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline"); |
30 | + public static final MessageSubject DEVICE_REMOVE_REQ = new MessageSubject("peer-device-remove-request"); | ||
30 | public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed"); | 31 | public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed"); |
31 | public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); | 32 | public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); |
32 | public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); | 33 | public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); | ... | ... |
-
Please register or login to post a comment