Hyunsun Moon
Committed by Jonathan Hart

Support existing VMs running on the newly added node

- Added openstack node state event
- Made openstack switching to listen to the node state events and
  add existing VMs running in the complete state node

Change-Id: I7b7186c3b889376a4bc0385313433604dcd93d70
......@@ -56,6 +56,10 @@ import org.onosproject.openstackinterface.OpenstackPort;
import org.onosproject.openstackinterface.OpenstackSubnet;
import org.onosproject.openstacknetworking.OpenstackPortInfo;
import org.onosproject.openstacknetworking.OpenstackSwitchingService;
import org.onosproject.openstacknode.OpenstackNode;
import org.onosproject.openstacknode.OpenstackNodeEvent;
import org.onosproject.openstacknode.OpenstackNodeListener;
import org.onosproject.openstacknode.OpenstackNodeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -103,11 +107,15 @@ public final class OpenstackSwitchingManager extends AbstractProvider
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackInterfaceService openstackService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackNodeService openstackNodeService;
private final ExecutorService deviceEventExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "device-event"));
private final ExecutorService configEventExecutor =
Executors.newSingleThreadExecutor(groupedThreads("onos/openstackswitching", "config-event"));
private final InternalDeviceListener internalDeviceListener = new InternalDeviceListener();
private final InternalOpenstackNodeListener internalNodeListener = new InternalOpenstackNodeListener();
private HostProviderService hostProvider;
......@@ -122,6 +130,7 @@ public final class OpenstackSwitchingManager extends AbstractProvider
protected void activate() {
coreService.registerApplication(APP_ID);
deviceService.addListener(internalDeviceListener);
openstackNodeService.addListener(internalNodeListener);
hostProvider = hostProviderRegistry.register(this);
log.info("Started");
......@@ -131,6 +140,7 @@ public final class OpenstackSwitchingManager extends AbstractProvider
protected void deactivate() {
hostProviderRegistry.unregister(this);
deviceService.removeListener(internalDeviceListener);
openstackNodeService.removeListener(internalNodeListener);
deviceEventExecutor.shutdown();
configEventExecutor.shutdown();
......@@ -179,6 +189,7 @@ public final class OpenstackSwitchingManager extends AbstractProvider
}
private void processPortAdded(Port port) {
// TODO check the node state is COMPLETE
OpenstackPort osPort = openstackService.port(port);
if (osPort == null) {
log.warn("Failed to get OpenStack port for {}", port);
......@@ -221,6 +232,10 @@ public final class OpenstackSwitchingManager extends AbstractProvider
private void processPortRemoved(Port port) {
ConnectPoint connectPoint = new ConnectPoint(port.element().id(), port.number());
removeHosts(connectPoint);
}
private void removeHosts(ConnectPoint connectPoint) {
hostService.getConnectedHosts(connectPoint).stream()
.forEach(host -> {
dhcpService.removeStaticMapping(host.mac());
......@@ -300,4 +315,45 @@ public final class OpenstackSwitchingManager extends AbstractProvider
}
}
}
private class InternalOpenstackNodeListener implements OpenstackNodeListener {
@Override
public void event(OpenstackNodeEvent event) {
OpenstackNode node = event.node();
// TODO check leadership of the node and make only the leader process
switch (event.type()) {
case COMPLETE:
log.info("COMPLETE node {} detected", node.hostname());
// adds existing VMs running on the complete state node
deviceService.getPorts(node.intBridge()).stream()
.filter(port -> port.annotations().value(PORT_NAME)
.startsWith(PORTNAME_PREFIX_VM) &&
port.isEnabled())
.forEach(port -> {
deviceEventExecutor.execute(() -> processPortAdded(port));
log.info("VM is detected on {}", port);
});
// removes stale VMs
hostService.getHosts().forEach(host -> {
if (deviceService.getPort(host.location().deviceId(),
host.location().port()) == null) {
deviceEventExecutor.execute(() -> removeHosts(host.location()));
log.info("Removed stale VM {}", host.location());
}
});
break;
case INCOMPLETE:
log.warn("{} is changed to INCOMPLETE state", node);
break;
case INIT:
case DEVICE_CREATED:
default:
break;
}
}
}
}
......
......@@ -19,6 +19,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import org.onlab.packet.IpAddress;
import org.onosproject.net.DeviceId;
import org.onosproject.openstacknode.OpenstackNodeEvent.NodeState;
import org.onosproject.openstacknode.OpenstackNodeService.NodeType;
import java.util.Comparator;
......@@ -27,6 +28,7 @@ import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.openstacknode.OpenstackNodeEvent.NodeState.INIT;
/**
* Representation of a compute/gateway node for OpenstackSwitching/Routing service.
......@@ -39,7 +41,7 @@ public final class OpenstackNode {
private final IpAddress dataIp;
private final DeviceId integrationBridge;
private final Optional<DeviceId> routerBridge;
private final OpenstackNodeState state;
private final NodeState state;
public static final Comparator<OpenstackNode> OPENSTACK_NODE_COMPARATOR =
(node1, node2) -> node1.hostname().compareTo(node2.hostname());
......@@ -50,7 +52,7 @@ public final class OpenstackNode {
IpAddress dataIp,
DeviceId integrationBridge,
Optional<DeviceId> routerBridge,
OpenstackNodeState state) {
NodeState state) {
this.hostname = hostname;
this.type = type;
this.managementIp = managementIp;
......@@ -67,7 +69,7 @@ public final class OpenstackNode {
* @param state openstack node init state
* @return openstack node
*/
public static OpenstackNode getUpdatedNode(OpenstackNode node, OpenstackNodeState state) {
public static OpenstackNode getUpdatedNode(OpenstackNode node, NodeState state) {
return new OpenstackNode(node.hostname,
node.type,
node.managementIp,
......@@ -137,7 +139,7 @@ public final class OpenstackNode {
*
* @return init state
*/
public OpenstackNodeState state() {
public NodeState state() {
return state;
}
......@@ -212,7 +214,7 @@ public final class OpenstackNode {
private IpAddress dataIp;
private DeviceId integrationBridge;
private Optional<DeviceId> routerBridge = Optional.empty();
private OpenstackNodeState state = OpenstackNodeState.noState();
private NodeState state = INIT;
private Builder() {
}
......@@ -305,7 +307,7 @@ public final class OpenstackNode {
* @param state node init state
* @return openstack node builder
*/
public Builder state(OpenstackNodeState state) {
public Builder state(NodeState state) {
this.state = state;
return this;
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.openstacknode;
import org.joda.time.LocalDateTime;
import org.onosproject.event.AbstractEvent;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Describes OpenStack node init state event.
*/
public class OpenstackNodeEvent extends AbstractEvent<OpenstackNodeEvent.NodeState, Object> {
public enum NodeState {
/**
* Indicates the node is newly added.
*/
INIT {
@Override
public void process(OpenstackNodeService nodeService, OpenstackNode node) {
nodeService.processInitState(node);
}
},
/**
* Indicates bridge devices are added according to the node state.
*/
DEVICE_CREATED {
@Override
public void process(OpenstackNodeService nodeService, OpenstackNode node) {
nodeService.processDeviceCreatedState(node);
}
},
/**
* Indicates all node initialization is done.
*/
COMPLETE {
@Override
public void process(OpenstackNodeService nodeService, OpenstackNode node) {
nodeService.processCompleteState(node);
}
},
/**
* Indicates node initialization is not done but unable to proceed to
* the next step for some reason.
*/
INCOMPLETE {
@Override
public void process(OpenstackNodeService nodeService, OpenstackNode node) {
nodeService.processIncompleteState(node);
}
};
public abstract void process(OpenstackNodeService nodeService, OpenstackNode node);
}
public OpenstackNodeEvent(NodeState state, Object subject) {
super(state, subject);
}
public OpenstackNode node() {
return (OpenstackNode) subject();
}
@Override
public String toString() {
return toStringHelper(this)
.add("time", new LocalDateTime(time()))
.add("state", type())
.add("node", subject())
.toString();
}
}
......@@ -15,16 +15,10 @@
*/
package org.onosproject.openstacknode;
import org.onosproject.event.EventListener;
/**
* Entity that defines possible init state of the OpenStack node.
*/
public interface OpenstackNodeState {
/**
* Returns null for no state.
*
* @return null
* Listener for OpenStack node events.
*/
static OpenstackNodeState noState() {
return null;
}
public interface OpenstackNodeListener extends EventListener<OpenstackNodeEvent> {
}
......
......@@ -34,6 +34,7 @@ import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
......@@ -57,6 +58,7 @@ import org.onosproject.net.config.basics.SubjectFactories;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.openstacknode.OpenstackNodeEvent.NodeState;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
import org.onosproject.ovsdb.controller.OvsdbNodeId;
......@@ -76,6 +78,7 @@ import static org.onosproject.net.AnnotationKeys.PORT_NAME;
import static org.onosproject.net.Device.Type.SWITCH;
import static org.onosproject.net.behaviour.TunnelDescription.Type.VXLAN;
import static org.onosproject.openstacknode.Constants.*;
import static org.onosproject.openstacknode.OpenstackNodeEvent.NodeState.*;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Dictionary;
......@@ -92,8 +95,9 @@ import java.util.stream.Collectors;
*/
@Component(immediate = true)
@Service
public final class OpenstackNodeManager implements OpenstackNodeService {
protected final Logger log = getLogger(getClass());
public final class OpenstackNodeManager extends ListenerRegistry<OpenstackNodeEvent, OpenstackNodeListener>
implements OpenstackNodeService {
private final Logger log = getLogger(getClass());
private static final KryoNamespace.Builder NODE_SERIALIZER = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
......@@ -160,59 +164,6 @@ public final class OpenstackNodeManager implements OpenstackNodeService {
private ApplicationId appId;
private NodeId localNodeId;
private enum NodeState implements OpenstackNodeState {
INIT {
@Override
public void process(OpenstackNodeManager nodeManager, OpenstackNode node) {
// make sure there is OVSDB connection
if (!nodeManager.isOvsdbConnected(node)) {
nodeManager.connectOvsdb(node);
return;
}
nodeManager.createBridge(node,
INTEGRATION_BRIDGE,
node.intBridge().toString().substring(DPID_BEGIN));
// creates additional router bridge if the node type is GATEWAY
if (node.type().equals(NodeType.GATEWAY)) {
nodeManager.createBridge(node,
ROUTER_BRIDGE,
node.routerBridge().get().toString().substring(DPID_BEGIN));
}
}
},
BRIDGE_CREATED {
@Override
public void process(OpenstackNodeManager nodeManager, OpenstackNode node) {
// make sure there is OVSDB connection
if (!nodeManager.isOvsdbConnected(node)) {
nodeManager.connectOvsdb(node);
return;
}
nodeManager.createTunnelInterface(node);
// creates additional patch ports connecting integration bridge and
// router bridge if the node type is GATEWAY
if (node.type().equals(NodeType.GATEWAY)) {
nodeManager.createPatchInterface(node);
}
}
},
COMPLETE {
@Override
public void process(OpenstackNodeManager nodeManager, OpenstackNode node) {
nodeManager.postInit(node);
}
},
INCOMPLETE {
@Override
public void process(OpenstackNodeManager nodeManager, OpenstackNode node) {
}
};
public abstract void process(OpenstackNodeManager nodeManager, OpenstackNode node);
}
@Activate
protected void activate() {
appId = coreService.getAppId(APP_ID);
......@@ -275,6 +226,57 @@ public final class OpenstackNodeManager implements OpenstackNodeService {
controller.getOvsdbClient(ovsdb).disconnect();
}
nodeStore.remove(node.hostname());
process(new OpenstackNodeEvent(INCOMPLETE, node));
}
@Override
public void processInitState(OpenstackNode node) {
// make sure there is OVSDB connection
if (!isOvsdbConnected(node)) {
connectOvsdb(node);
return;
}
process(new OpenstackNodeEvent(INIT, node));
createBridge(node, INTEGRATION_BRIDGE,
node.intBridge().toString().substring(DPID_BEGIN));
// creates additional router bridge if the node type is GATEWAY
if (node.type().equals(NodeType.GATEWAY)) {
createBridge(node, ROUTER_BRIDGE,
node.routerBridge().get().toString().substring(DPID_BEGIN));
}
}
@Override
public void processDeviceCreatedState(OpenstackNode node) {
// make sure there is OVSDB connection
if (!isOvsdbConnected(node)) {
connectOvsdb(node);
return;
}
process(new OpenstackNodeEvent(DEVICE_CREATED, node));
createTunnelInterface(node);
// creates additional patch ports connecting integration bridge and
// router bridge if the node type is GATEWAY
if (node.type().equals(NodeType.GATEWAY)) {
createPatchInterface(node);
}
}
@Override
public void processCompleteState(OpenstackNode node) {
if (isOvsdbConnected(node)) {
OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort);
controller.getOvsdbClient(ovsdb).disconnect();
}
process(new OpenstackNodeEvent(COMPLETE, node));
log.info("Finished init {}", node.hostname());
}
@Override
public void processIncompleteState(OpenstackNode node) {
process(new OpenstackNodeEvent(INCOMPLETE, node));
}
@Override
......@@ -285,22 +287,11 @@ public final class OpenstackNodeManager implements OpenstackNodeService {
@Override
public Set<OpenstackNode> completeNodes() {
return nodeStore.values().stream().map(Versioned::value)
.filter(node -> node.state().equals(NodeState.COMPLETE))
.filter(node -> node.state().equals(COMPLETE))
.collect(Collectors.toSet());
}
@Override
public boolean isComplete(String hostname) {
Versioned<OpenstackNode> versionedNode = nodeStore.get(hostname);
if (versionedNode == null) {
log.warn("Node {} does not exist", hostname);
return false;
}
OpenstackNodeState state = versionedNode.value().state();
return state != null && state.equals(NodeState.COMPLETE);
}
@Override
public Optional<IpAddress> dataIp(DeviceId deviceId) {
OpenstackNode node = nodeByDeviceId(deviceId);
if (node == null) {
......@@ -337,21 +328,11 @@ public final class OpenstackNodeManager implements OpenstackNodeService {
}
private void initNode(OpenstackNode node) {
NodeState state = (NodeState) node.state();
NodeState state = node.state();
state.process(this, node);
log.debug("Processing node: {} state: {}", node.hostname(), state);
}
private void postInit(OpenstackNode node) {
if (isOvsdbConnected(node)) {
OvsdbNodeId ovsdb = new OvsdbNodeId(node.managementIp(), ovsdbPort);
controller.getOvsdbClient(ovsdb).disconnect();
}
// TODO add gateway node to scalable gateway pool
log.info("Finished init {}", node.hostname());
}
private void setNodeState(OpenstackNode node, NodeState newState) {
log.debug("Changed {} state: {}", node.hostname(), newState);
nodeStore.put(node.hostname(), OpenstackNode.getUpdatedNode(node, newState));
......@@ -359,23 +340,23 @@ public final class OpenstackNodeManager implements OpenstackNodeService {
private NodeState nodeState(OpenstackNode node) {
if (!deviceService.isAvailable(node.intBridge())) {
return NodeState.INIT;
return INIT;
}
if (node.type().equals(NodeType.GATEWAY) &&
!deviceService.isAvailable(node.routerBridge().get())) {
return NodeState.INIT;
return INIT;
}
if (!isIfaceCreated(node.intBridge(), DEFAULT_TUNNEL)) {
return NodeState.BRIDGE_CREATED;
return DEVICE_CREATED;
}
if (node.type().equals(NodeType.GATEWAY) && (
!isIfaceCreated(node.routerBridge().get(), PATCH_ROUT_BRIDGE) ||
!isIfaceCreated(node.intBridge(), PATCH_INTG_BRIDGE))) {
return NodeState.BRIDGE_CREATED;
return DEVICE_CREATED;
}
return NodeState.COMPLETE;
return COMPLETE;
}
private boolean isIfaceCreated(DeviceId deviceId, String ifaceName) {
......@@ -683,4 +664,3 @@ public final class OpenstackNodeManager implements OpenstackNodeService {
}
}
}
......
......@@ -16,6 +16,7 @@
package org.onosproject.openstacknode;
import org.onlab.packet.IpAddress;
import org.onosproject.event.ListenerService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
......@@ -26,7 +27,8 @@ import java.util.Set;
/**
* Handles the bootstrap request for compute/gateway node.
*/
public interface OpenstackNodeService {
public interface OpenstackNodeService
extends ListenerService<OpenstackNodeEvent, OpenstackNodeListener> {
enum NodeType {
/**
......@@ -44,6 +46,34 @@ public interface OpenstackNodeService {
void addOrUpdateNode(OpenstackNode node);
/**
* Bootstraps node with INIT state.
*
* @param node openstack node
*/
void processInitState(OpenstackNode node);
/**
* Bootstraps node with DEVICE_CREATED state.
*
* @param node openstack node
*/
void processDeviceCreatedState(OpenstackNode node);
/**
* Bootstraps node with COMPLETE state.
*
* @param node openstack node
*/
void processCompleteState(OpenstackNode node);
/**
* Bootstraps node with INCOMPLETE state.
*
* @param node openstack node
*/
void processIncompleteState(OpenstackNode node);
/**
* Deletes a node from the service.
*
* @param node openstack node
......@@ -65,14 +95,6 @@ public interface OpenstackNodeService {
Set<OpenstackNode> completeNodes();
/**
* Returns node initialization state is complete or not.
*
* @param hostname hostname of the node
* @return true if initial node setup is completed, otherwise false
*/
boolean isComplete(String hostname);
/**
* Returns data network IP address of a given integration bridge device.
*
* @param intBridgeId integration bridge device id
......
......@@ -34,9 +34,6 @@ import java.util.List;
description = "Lists all nodes registered in OpenStack node service")
public class OpenstackNodeListCommand extends AbstractShellCommand {
private static final String COMPLETE = "COMPLETE";
private static final String INCOMPLETE = "INCOMPLETE";
@Override
protected void execute() {
OpenstackNodeService nodeService = AbstractShellCommand.get(OpenstackNodeService.class);
......@@ -44,7 +41,7 @@ public class OpenstackNodeListCommand extends AbstractShellCommand {
Collections.sort(nodes, OpenstackNode.OPENSTACK_NODE_COMPARATOR);
if (outputJson()) {
print("%s", json(nodeService, nodes));
print("%s", json(nodes));
} else {
for (OpenstackNode node : nodes) {
print("hostname=%s, type=%s, managementIp=%s, dataIp=%s, intBridge=%s, routerBridge=%s init=%s",
......@@ -54,13 +51,13 @@ public class OpenstackNodeListCommand extends AbstractShellCommand {
node.dataIp(),
node.intBridge(),
node.routerBridge(),
getState(nodeService, node));
node.state());
}
print("Total %s nodes", nodeService.nodes().size());
}
}
private JsonNode json(OpenstackNodeService nodeService, List<OpenstackNode> nodes) {
private JsonNode json(List<OpenstackNode> nodes) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
for (OpenstackNode node : nodes) {
......@@ -71,12 +68,8 @@ public class OpenstackNodeListCommand extends AbstractShellCommand {
.put("dataIp", node.dataIp().toString())
.put("intBridge", node.intBridge().toString())
.put("routerBridge", node.routerBridge().toString())
.put("state", getState(nodeService, node)));
.put("state", node.state().name()));
}
return result;
}
private String getState(OpenstackNodeService nodeService, OpenstackNode node) {
return nodeService.isComplete(node.hostname()) ? COMPLETE : INCOMPLETE;
}
}
\ No newline at end of file
......