helenyrwu
Committed by Yuta HIGUCHI

NetconfAlarmProvider alerts core about notifications given subscription.

Change-Id: I7561ba680eb8bac33a8543d6aa1bccf6732e95db
......@@ -141,7 +141,41 @@ public interface NetconfSession {
boolean deleteConfig(String targetConfiguration) throws NetconfException;
/**
* Locks the candidate configuration.
* Starts subscription to the device's notifications.
*
* @throws NetconfException when there is a problem starting the subscription
*/
void startSubscription() throws NetconfException;
/**
* Ends subscription to the device's notifications.
*
* @throws NetconfException when there is a problem ending the subscription
*/
void endSubscription() throws NetconfException;
/**
* Locks the specified configuration.
*
* @param configType type of configuration to be locked
* @return true if successful.
* @throws NetconfException when there is a problem in the communication process on
* the underlying connection
*/
boolean lock(String configType) throws NetconfException;
/**
* Unlocks the specified configuration.
*
* @param configType type of configuration to be locked
* @return true if successful.
* @throws NetconfException when there is a problem in the communication process on
* the underlying connection
*/
boolean unlock(String configType) throws NetconfException;
/**
* Locks the running configuration.
*
* @return true if successful.
* @throws NetconfException when there is a problem in the communication process on
......@@ -150,7 +184,7 @@ public interface NetconfSession {
boolean lock() throws NetconfException;
/**
* Unlocks the candidate configuration.
* Unlocks the running configuration.
*
* @return true if successful.
* @throws NetconfException when there is a problem in the communication process on
......
......@@ -7,6 +7,7 @@ COMPILE_DEPS = [
TEST_DEPS = [
'//lib:TEST_ADAPTERS',
'//utils/osgi:onlab-osgi-tests',
'//core/api:onos-api-tests',
]
osgi_jar_with_tests (
......
......@@ -81,9 +81,10 @@ public class NetconfSessionImpl implements NetconfSession {
private List<String> deviceCapabilities =
Collections.singletonList("urn:ietf:params:netconf:base:1.0");
private String serverCapabilities;
private NetconfStreamHandler t;
private NetconfStreamHandler streamHandler;
private Map<Integer, CompletableFuture<String>> replies;
private List<String> errorReplies;
private boolean subscriptionConnected = false;
public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
......@@ -136,9 +137,9 @@ public class NetconfSessionImpl implements NetconfSession {
try {
sshSession = netconfConnection.openSession();
sshSession.startSubSystem("netconf");
t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
sshSession.getStderr(), deviceInfo,
new NetconfSessionDelegateImpl());
streamHandler = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
sshSession.getStderr(), deviceInfo,
new NetconfSessionDelegateImpl());
this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
sendHello();
} catch (IOException e) {
......@@ -148,6 +149,45 @@ public class NetconfSessionImpl implements NetconfSession {
}
}
private void startSubscriptionConnection() throws NetconfException {
if (!serverCapabilities.contains("interleave")) {
throw new NetconfException("Device" + deviceInfo + "does not support interleave");
}
String reply = sendRequest(createSubscriptionString());
if (!checkReply(reply)) {
throw new NetconfException("Subscription not successful with device "
+ deviceInfo + " with reply " + reply);
}
subscriptionConnected = true;
}
public void startSubscription() throws NetconfException {
if (!subscriptionConnected) {
startSubscriptionConnection();
}
streamHandler.setEnableNotifications(true);
}
private String createSubscriptionString() {
StringBuilder subscriptionbuffer = new StringBuilder();
subscriptionbuffer.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
subscriptionbuffer.append(" <create-subscription\n");
subscriptionbuffer.append("xmlns=\"urn:ietf:params:xml:ns:netconf:notification:1.0\">\n");
subscriptionbuffer.append(" </create-subscription>\n");
subscriptionbuffer.append("</rpc>\n");
subscriptionbuffer.append(ENDPATTERN);
return subscriptionbuffer.toString();
}
@Override
public void endSubscription() throws NetconfException {
if (subscriptionConnected) {
streamHandler.setEnableNotifications(false);
} else {
throw new NetconfException("Subscription does not exist.");
}
}
private void sendHello() throws NetconfException {
serverCapabilities = sendRequest(createHelloString());
}
......@@ -197,7 +237,7 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public CompletableFuture<String> request(String request) {
CompletableFuture<String> ftrep = t.sendMessage(request);
CompletableFuture<String> ftrep = streamHandler.sendMessage(request);
replies.put(messageIdInteger.get(), ftrep);
return ftrep;
}
......@@ -382,31 +422,47 @@ public class NetconfSessionImpl implements NetconfSession {
}
@Override
public boolean lock() throws NetconfException {
public boolean lock(String configType) throws NetconfException {
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<lock>");
rpc.append("<target>");
rpc.append("<candidate/>");
rpc.append("<");
rpc.append(configType);
rpc.append("/>");
rpc.append("</target>");
rpc.append("</lock>");
rpc.append("</rpc>");
rpc.append(ENDPATTERN);
return checkReply(sendRequest(rpc.toString()));
String lockReply = sendRequest(rpc.toString());
return checkReply(lockReply);
}
@Override
public boolean unlock() throws NetconfException {
public boolean unlock(String configType) throws NetconfException {
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<rpc xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<unlock>");
rpc.append("<target>");
rpc.append("<candidate/>");
rpc.append("<");
rpc.append(configType);
rpc.append("/>");
rpc.append("</target>");
rpc.append("</unlock>");
rpc.append("</rpc>");
rpc.append(ENDPATTERN);
return checkReply(sendRequest(rpc.toString()));
String unlockReply = sendRequest(rpc.toString());
return checkReply(unlockReply);
}
@Override
public boolean lock() throws NetconfException {
return lock("running");
}
@Override
public boolean unlock() throws NetconfException {
return unlock("running");
}
@Override
......@@ -454,12 +510,12 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
t.addDeviceEventListener(listener);
streamHandler.addDeviceEventListener(listener);
}
@Override
public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
t.removeDeviceEventListener(listener);
streamHandler.removeDeviceEventListener(listener);
}
private boolean checkReply(String reply) throws NetconfException {
......@@ -481,6 +537,7 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public void notify(NetconfDeviceOutputEvent event) {
Optional<Integer> messageId = event.getMessageID();
if (!messageId.isPresent()) {
errorReplies.add(event.getMessagePayload());
log.error("Device {} sent error reply {}",
......
......@@ -16,6 +16,7 @@
package org.onosproject.netconf.ctl;
import com.google.common.annotations.Beta;
import org.onosproject.netconf.NetconfDeviceOutputEventListener;
import java.util.concurrent.CompletableFuture;
......@@ -46,4 +47,13 @@ public interface NetconfStreamHandler {
* @param listener Netconf device event listener
*/
void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
@Beta
/**
* Sets instance variable that when true allows receipt of notifications.
*
* @param enableNotifications if true, allows action based off notifications
* else, stops action based off notifications
*/
void setEnableNotifications(boolean enableNotifications);
}
......
......@@ -57,8 +57,9 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
private NetconfDeviceInfo netconfDeviceInfo;
private NetconfSessionDelegate sessionDelegate;
private NetconfMessageState state;
private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
= Lists.newArrayList();
private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
= Lists.newCopyOnWriteArrayList();
private boolean enableNotifications = true;
public NetconfStreamThread(final InputStream in, final OutputStream out,
final InputStream err, NetconfDeviceInfo deviceInfo,
......@@ -195,12 +196,14 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
} else if (deviceReply.contains(NOTIFICATION_LABEL)) {
final String finalDeviceReply = deviceReply;
netconfDeviceEventListeners.forEach(
listener -> listener.event(new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
null, finalDeviceReply, getMsgId(finalDeviceReply),
netconfDeviceInfo)));
if (enableNotifications) {
final String finalDeviceReply = deviceReply;
netconfDeviceEventListeners.forEach(
listener -> listener.event(new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
null, finalDeviceReply, getMsgId(finalDeviceReply),
netconfDeviceInfo)));
}
} else {
log.info("Error on replay from device {} ", deviceReply);
}
......@@ -240,4 +243,8 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
netconfDeviceEventListeners.remove(listener);
}
public void setEnableNotifications(boolean enableNotifications) {
this.enableNotifications = enableNotifications;
}
}
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>onos-netconf-providers</artifactId>
<groupId>org.onosproject</groupId>
<version>1.7.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>onos-netconf-provider-alarm</artifactId>
<packaging>bundle</packaging>
<description>ONOS Netconf protocol alarm provider</description>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-netconf-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-netconf-ctl</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.provider.netconf.alarm;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.incubator.net.faultmanagement.alarm.Alarm;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProvider;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderService;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmService;
import org.onosproject.incubator.net.faultmanagement.alarm.AlarmProviderRegistry;
import org.onosproject.incubator.net.faultmanagement.alarm.DefaultAlarm;
import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.netconf.NetconfController;
import org.onosproject.netconf.NetconfDevice;
import org.onosproject.netconf.NetconfDeviceInfo;
import org.onosproject.netconf.NetconfDeviceListener;
import org.onosproject.netconf.NetconfDeviceOutputEvent;
import org.onosproject.netconf.NetconfDeviceOutputEventListener;
import org.onosproject.netconf.NetconfSession;
import org.onosproject.netconf.ctl.NetconfDeviceOutputEventListenerImpl;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider which uses an Alarm Manager to keep track of device notifications.
*/
@Component(immediate = true)
public class NetconfAlarmProvider extends AbstractProvider implements AlarmProvider {
public static final String ACTIVE = "active";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected AlarmProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetconfController controller;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected AlarmService alarmService;
protected AlarmProviderService providerService;
private Map<DeviceId, InternalNotificationListener> idNotificationListenerMap = Maps.newHashMap();
public NetconfAlarmProvider() {
super(new ProviderId("netconf", "org.onosproject.netconf"));
}
private NetconfDeviceListener deviceListener = new InnerDeviceListener();
@Activate
public void activate() {
providerService = providerRegistry.register(this);
controller.getNetconfDevices().forEach(id -> {
NetconfDevice device = controller.getNetconfDevice(id);
NetconfSession session = device.getSession();
InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
session.addDeviceOutputListener(listener);
idNotificationListenerMap.put(id, listener);
});
controller.addDeviceListener(deviceListener);
log.info("NetconfAlarmProvider Started");
}
@Deactivate
public void deactivate() {
providerRegistry.unregister(this);
idNotificationListenerMap.forEach((id, listener) -> {
controller.getNetconfDevice(id)
.getSession()
.removeDeviceOutputListener(listener);
});
controller.removeDeviceListener(deviceListener);
providerService = null;
log.info("NetconfAlarmProvider Stopped");
}
@Override
public void triggerProbe(DeviceId deviceId) {
log.debug("Alarm probe triggered with " + deviceId);
}
private void triggerProbe(DeviceId deviceId, Collection<Alarm> alarms) {
providerService.updateAlarmList(deviceId, alarms);
triggerProbe(deviceId);
}
private class InternalNotificationListener extends NetconfDeviceOutputEventListenerImpl
implements NetconfDeviceOutputEventListener {
InternalNotificationListener(NetconfDeviceInfo deviceInfo) {
super(deviceInfo);
}
@Override
public void event(NetconfDeviceOutputEvent event) {
if (event.type() == NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION) {
DeviceId deviceId = event.getDeviceInfo().getDeviceId();
Alarm newAlarm = new DefaultAlarm.Builder(deviceId, event.getMessagePayload(),
Alarm.SeverityLevel.WARNING, 0).build();
Collection<Alarm> alarms = Collections.singleton(newAlarm);
triggerProbe(deviceId, alarms);
}
}
}
private class InnerDeviceListener implements NetconfDeviceListener {
@Override
public void deviceAdded(DeviceId deviceId) {
NetconfDevice device = controller.getNetconfDevice(deviceId);
NetconfSession session = device.getSession();
InternalNotificationListener listener = new InternalNotificationListener(device.getDeviceInfo());
session.addDeviceOutputListener(listener);
idNotificationListenerMap.put(deviceId, listener);
}
@Override
public void deviceRemoved(DeviceId deviceId) {
idNotificationListenerMap.remove(deviceId);
}
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Provider that uses Netconf device output listener capability to send
* appropriate alarms to the alarm manager.
*/
package org.onosproject.provider.netconf.alarm;
\ No newline at end of file
......@@ -17,11 +17,13 @@
<app name="org.onosproject.netconf" origin="ON.Lab" version="${project.version}"
category="Provider" url="https://wiki.onosproject.org/display/ONOS/NETCONF" title="NETCONF Provider"
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
apps="org.onosproject.faultmanagement"
features="${project.artifactId}">
<description>${project.description}</description>
<artifact>mvn:${project.groupId}/onos-netconf-api/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-netconf-ctl/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-netconf-provider-device/${project.version}</artifact>
<artifact>mvn:${project.groupId}/onos-netconf-provider-alarm/${project.version}</artifact>
</app>
......
......@@ -23,6 +23,7 @@
<bundle>mvn:${project.groupId}/onos-netconf-ctl/${project.version}</bundle>
<bundle>mvn:${project.groupId}/onos-netconf-provider-device/${project.version}</bundle>
<bundle>mvn:${project.groupId}/onos-netconf-provider-alarm/${project.version}</bundle>
</feature>
</features>
......
......@@ -16,7 +16,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
......@@ -26,7 +26,19 @@
</parent>
<artifactId>onos-netconf-app</artifactId>
<packaging>pom</packaging>
<packaging>bundle</packaging>
<properties>
<onos.app.name>org.onosproject.netconf</onos.app.name>
<onos.app.title>Netconf Meta App</onos.app.title>
<onos.app.category>Provider</onos.app.category>
<onos.app.requires>
org.onosproject.incubator.net.faultmanagement
</onos.app.requires>
<onos.app.url>
https://wiki.onosproject.org/display/ONOS/Application+Subsystem
</onos.app.url>
</properties>
<description>NETCONF protocol southbound providers</description>
......@@ -36,6 +48,11 @@
<artifactId>onos-netconf-provider-device</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-netconf-provider-alarm</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Add other dependencies here as more bundles are added to the app -->
</dependencies>
......
......@@ -302,8 +302,6 @@ public class NetconfDeviceProvider extends AbstractProvider
DeviceKeyId.deviceKeyId(deviceId.toString()),
null, addr.name(), addr.password()));
providerService.deviceConnected(deviceId, deviceDescription);
});
} catch (ConfigException e) {
log.error("Cannot read config error " + e);
......
......@@ -33,6 +33,7 @@
<modules>
<module>device</module>
<module>app</module>
<module>alarm</module>
</modules>
<dependencies>
......