Hyunsun Moon
Committed by Gerrit Code Review

CORD-151 Refactor cordvtn service to reduce complexity

Change-Id: I489e1d3df7f08d04d6b6a2aa23b9d4e6d7a054e4
......@@ -27,12 +27,12 @@ import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Configuration object for CORD VTN service.
* Configuration object for CordVtn service.
*/
public class CordVtnConfig extends Config<ApplicationId> {
public static final String OVSDB_NODES = "ovsdbNodes";
public static final String HOSTNAME = "hostname";
public static final String HOST = "host";
public static final String IP = "ip";
public static final String PORT = "port";
......@@ -49,7 +49,7 @@ public class CordVtnConfig extends Config<ApplicationId> {
return null;
}
nodes.forEach(jsonNode -> ovsdbNodes.add(new OvsdbNodeConfig(
jsonNode.path(HOSTNAME).asText(),
jsonNode.path(HOST).asText(),
IpAddress.valueOf(jsonNode.path(IP).asText()),
TpPort.tpPort(jsonNode.path(PORT).asInt()))));
......@@ -57,27 +57,27 @@ public class CordVtnConfig extends Config<ApplicationId> {
}
/**
* Configuration for an OVSDB node.
* Configuration for an ovsdb node.
*/
public static class OvsdbNodeConfig {
private final String hostname;
private final String host;
private final IpAddress ip;
private final TpPort port;
public OvsdbNodeConfig(String hostname, IpAddress ip, TpPort port) {
this.hostname = checkNotNull(hostname);
public OvsdbNodeConfig(String host, IpAddress ip, TpPort port) {
this.host = checkNotNull(host);
this.ip = checkNotNull(ip);
this.port = checkNotNull(port);
}
/**
* Returns hostname of the node.
* Returns host information of the node.
*
* @return hostname
* @return host
*/
public String hostname() {
return this.hostname;
public String host() {
return this.host;
}
/**
......
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cordvtn;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
import org.onosproject.net.config.NetworkConfigListener;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.config.NetworkConfigService;
import org.onosproject.net.config.basics.SubjectFactories;
import org.slf4j.Logger;
import static org.onosproject.cordvtn.OvsdbNode.State.INIT;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Reads node information from the network config file and handles the config
* update events.
* Only a leader controller performs the node addition or deletion.
*/
@Component(immediate = true)
public class CordVtnConfigManager {
protected final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry configRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigService configService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CordVtnService cordVtnService;
private final ConfigFactory configFactory =
new ConfigFactory(SubjectFactories.APP_SUBJECT_FACTORY, CordVtnConfig.class, "cordvtn") {
@Override
public CordVtnConfig createConfig() {
return new CordVtnConfig();
}
};
private final LeadershipEventListener leadershipListener = new InternalLeadershipListener();
private final NetworkConfigListener configListener = new InternalConfigListener();
private NodeId local;
private ApplicationId appId;
@Activate
protected void active() {
local = clusterService.getLocalNode().id();
appId = coreService.getAppId(CordVtnService.CORDVTN_APP_ID);
configService.addListener(configListener);
configRegistry.registerConfigFactory(configFactory);
leadershipService.addListener(leadershipListener);
leadershipService.runForLeadership(CordVtnService.CORDVTN_APP_ID);
}
@Deactivate
protected void deactivate() {
leadershipService.removeListener(leadershipListener);
leadershipService.withdraw(appId.name());
configRegistry.unregisterConfigFactory(configFactory);
configService.removeListener(configListener);
}
private void readConfiguration() {
CordVtnConfig config = configRegistry.getConfig(appId, CordVtnConfig.class);
if (config == null) {
log.warn("No configuration found");
return;
}
config.ovsdbNodes().forEach(node -> {
DefaultOvsdbNode ovsdbNode =
new DefaultOvsdbNode(node.host(), node.ip(), node.port(), INIT);
cordVtnService.addNode(ovsdbNode);
log.info("Add new node {}", node.host());
});
}
private synchronized void processLeadershipChange(NodeId leader) {
if (leader == null || !leader.equals(local)) {
return;
}
readConfiguration();
}
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
if (event.subject().topic().equals(appId.name())) {
processLeadershipChange(event.subject().leader());
}
}
}
private class InternalConfigListener implements NetworkConfigListener {
@Override
public void event(NetworkConfigEvent event) {
// TODO handle update event
}
}
}
......@@ -15,8 +15,8 @@
*/
package org.onosproject.cordvtn;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.cordvtn.OvsdbNode.State;
import org.onosproject.net.DeviceId;
import java.util.List;
......@@ -24,22 +24,30 @@ import java.util.List;
* Service for provisioning overlay virtual networks on compute nodes.
*/
public interface CordVtnService {
String CORDVTN_APP_ID = "org.onosproject.cordvtn";
/**
* Adds a new node to the service.
*
* @param hostname hostname of the node
* @param ip ip address to access the ovsdb server running on the node
* @param port port number to access the ovsdb server running on the node
* @param ovsdbNode ovsdb node
*/
void addNode(OvsdbNode ovsdbNode);
/**
* Deletes a node from the service.
*
* @param ovsdbNode ovsdb node
*/
void addNode(String hostname, IpAddress ip, TpPort port);
void deleteNode(OvsdbNode ovsdbNode);
/**
* Deletes the node from the service.
* Updates ovsdb node.
* It only used for updating node's connection state.
*
* @param ip ip address to access the ovsdb server running on the node
* @param port port number to access the ovsdb server running on the node
* @param ovsdbNode ovsdb node
* @param state ovsdb connection state
*/
void deleteNode(IpAddress ip, TpPort port);
void updateNode(OvsdbNode ovsdbNode, State state);
/**
* Returns the number of the nodes known to the service.
......@@ -49,6 +57,14 @@ public interface CordVtnService {
int getNodeCount();
/**
* Returns OvsdbNode with given device id.
*
* @param deviceId device id
* @return ovsdb node
*/
OvsdbNode getNode(DeviceId deviceId);
/**
* Returns all nodes known to the service.
*
* @return list of nodes
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.cordvtn;
import com.google.common.base.MoreObjects;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
import org.onosproject.net.DeviceId;
......@@ -26,21 +27,15 @@ import java.util.Objects;
*/
public class DefaultOvsdbNode implements OvsdbNode {
private final String hostname;
private final String host;
private final IpAddress ip;
private final TpPort port;
private final DeviceId deviceId;
private final DeviceId bridgeId;
private final State state;
public DefaultOvsdbNode(String hostname, IpAddress ip, TpPort port,
DeviceId bridgeId, State state) {
this.hostname = hostname;
public DefaultOvsdbNode(String host, IpAddress ip, TpPort port, State state) {
this.host = host;
this.ip = ip;
this.port = port;
this.deviceId = DeviceId.deviceId(
"ovsdb:" + ip.toString() + ":" + port.toString());
this.bridgeId = bridgeId;
this.state = state;
}
......@@ -55,8 +50,8 @@ public class DefaultOvsdbNode implements OvsdbNode {
}
@Override
public String hostname() {
return this.hostname;
public String host() {
return this.host;
}
@Override
......@@ -66,12 +61,12 @@ public class DefaultOvsdbNode implements OvsdbNode {
@Override
public DeviceId deviceId() {
return this.deviceId;
return DeviceId.deviceId("ovsdb:" + this.ip.toString() + ":" + this.port.toString());
}
@Override
public DeviceId bridgeId() {
return this.bridgeId;
public DeviceId intBrId() {
return DeviceId.deviceId("of:" + this.host);
}
@Override
......@@ -82,8 +77,9 @@ public class DefaultOvsdbNode implements OvsdbNode {
if (o instanceof DefaultOvsdbNode) {
DefaultOvsdbNode that = (DefaultOvsdbNode) o;
// We compare the ip and port only.
if (this.ip.equals(that.ip) && this.port.equals(that.port)) {
if (this.host.equals(that.host) &&
this.ip.equals(that.ip) &&
this.port.equals(that.port)) {
return true;
}
}
......@@ -92,6 +88,16 @@ public class DefaultOvsdbNode implements OvsdbNode {
@Override
public int hashCode() {
return Objects.hash(ip, port);
return Objects.hash(host, ip, port);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("host", host)
.add("ip", ip)
.add("port", port)
.add("state", state)
.toString();
}
}
......
......@@ -15,12 +15,19 @@
*/
package org.onosproject.cordvtn;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.slf4j.Logger;
import java.util.concurrent.Executors;
......@@ -28,120 +35,131 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.cordvtn.OvsdbNode.State.CONNECTED;
import static org.onosproject.cordvtn.OvsdbNode.State.DISCONNECTED;
import static org.onosproject.cordvtn.OvsdbNode.State.READY;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Node connection manager.
* Provides the connection state management of all nodes registered to the service
* so that the nodes keep connected unless it is requested to be deleted.
*/
@Component(immediate = true)
public class NodeConnectionManager {
protected final Logger log = getLogger(getClass());
private final ApplicationId appId;
private final NodeId localId;
private final EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore;
private final MastershipService mastershipService;
private final LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
CordVtnService cordVtnService;
private static final int DELAY_SEC = 5;
private ScheduledExecutorService connectionExecutor;
/**
* Creates a new NodeConnectionManager.
*
* @param appId app id
* @param localId local id
* @param nodeStore node store
* @param mastershipService mastership service
* @param leadershipService leadership service
*/
public NodeConnectionManager(ApplicationId appId, NodeId localId,
EventuallyConsistentMap<DeviceId, OvsdbNode> nodeStore,
MastershipService mastershipService,
LeadershipService leadershipService) {
this.appId = appId;
this.localId = localId;
this.nodeStore = nodeStore;
this.mastershipService = mastershipService;
this.leadershipService = leadershipService;
}
/**
* Starts the node connection manager.
*/
public void start() {
connectionExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cordvtn", "connection-executor"));
connectionExecutor.scheduleWithFixedDelay(() -> nodeStore.values()
private final DeviceListener deviceListener = new InternalDeviceListener();
private final ScheduledExecutorService connectionExecutor = Executors
.newSingleThreadScheduledExecutor(groupedThreads("onos/cordvtn", "connection-manager"));
private NodeId localId;
@Activate
protected void activate() {
localId = clusterService.getLocalNode().id();
deviceService.addListener(deviceListener);
connectionExecutor.scheduleWithFixedDelay(() -> cordVtnService.getNodes()
.stream()
.filter(node -> localId.equals(getMaster(node)))
.forEach(this::connectNode), 0, DELAY_SEC, TimeUnit.SECONDS);
.forEach(node -> {
connect(node);
disconnect(node);
}), 0, DELAY_SEC, TimeUnit.SECONDS);
}
/**
* Stops the node connection manager.
*/
@Deactivate
public void stop() {
connectionExecutor.shutdown();
deviceService.removeListener(deviceListener);
}
/**
* Adds a new node to the system.
*
* @param ovsdbNode ovsdb node
*/
public void connectNode(OvsdbNode ovsdbNode) {
public void connect(OvsdbNode ovsdbNode) {
switch (ovsdbNode.state()) {
case INIT:
case DISCONNECTED:
// TODO: set the node to passive mode
setPassiveMode(ovsdbNode);
case READY:
// TODO: initiate connection
break;
case CONNECTED:
setupConnection(ovsdbNode);
break;
default:
break;
}
}
/**
* Deletes the ovsdb node.
*
* @param ovsdbNode ovsdb node
*/
public void disconnectNode(OvsdbNode ovsdbNode) {
public void disconnect(OvsdbNode ovsdbNode) {
switch (ovsdbNode.state()) {
case CONNECTED:
case DISCONNECT:
// TODO: disconnect
break;
case INIT:
case READY:
case DISCONNECTED:
default:
break;
}
}
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
if (device.type() != Device.Type.CONTROLLER) {
return;
}
DefaultOvsdbNode node;
switch (event.type()) {
case DEVICE_ADDED:
node = (DefaultOvsdbNode) cordVtnService.getNode(device.id());
if (node != null) {
cordVtnService.updateNode(node, CONNECTED);
}
break;
case DEVICE_AVAILABILITY_CHANGED:
node = (DefaultOvsdbNode) cordVtnService.getNode(device.id());
if (node != null) {
cordVtnService.updateNode(node, DISCONNECTED);
}
break;
default:
break;
}
}
}
private NodeId getMaster(OvsdbNode ovsdbNode) {
// Return the master of the bridge(switch) if it exist or
// return the current leader
if (ovsdbNode.bridgeId() == DeviceId.NONE) {
return leadershipService.getLeader(this.appId.name());
} else {
return mastershipService.getMasterFor(ovsdbNode.bridgeId());
NodeId master = mastershipService.getMasterFor(ovsdbNode.intBrId());
// master is null if there's no such device
if (master == null) {
master = leadershipService.getLeader(CordVtnService.CORDVTN_APP_ID);
}
return master;
}
private void setPassiveMode(OvsdbNode ovsdbNode) {
// TODO: need ovsdb client implementation first
// TODO: set the remove ovsdb server passive mode
// TODO: set the node state READY if it succeed
cordVtnService.updateNode(ovsdbNode, READY);
}
private void connect(OvsdbNode ovsdbNode) {
// TODO: need ovsdb client implementation first
}
private void disconnect(OvsdbNode ovsdbNode) {
// TODO: need ovsdb client implementation first
private void setupConnection(OvsdbNode ovsdbNode) {
// TODO initiate connection
}
}
......
......@@ -24,51 +24,52 @@ import org.onosproject.net.DeviceId;
*/
public interface OvsdbNode {
/**
* State of the ovsdb node.
* Ovsdb connection state.
*/
enum State {
INIT, READY, CONNECTED, DISCONNECTED
INIT, READY, CONNECTED, DISCONNECT, DISCONNECTED
}
/**
* Returns the IP address of ovsdb server.
* Returns the IP address of the ovsdb server.
*
* @return ip address
*/
IpAddress ip();
/**
* Returns the port number of ovsdb server.
* Returns the port number of the ovsdb server.
*
* @return port number
*/
TpPort port();
/**
* Returns the hostname of the node.
* Returns the host information of the ovsdb server.
* It could be hostname or ip address.
*
* @return hostname
* @return host
*/
String hostname();
String host();
/**
* Returns the state of the node.
* Returns the connection state of the ovsdb server.
*
* @return state of the node
* @return connection state
*/
State state();
/**
* Returns the device ID of the node.
* Returns the device id of the ovsdb server.
*
* @return device id
*/
DeviceId deviceId();
/**
* Returns the device ID of the bridge associated with this node.
* Returns the device id of the integration bridge associated with the node.
*
* @return device id
*/
DeviceId bridgeId();
DeviceId intBrId();
}
......