Andrea Campanella
Committed by Gerrit Code Review

ONOS-3605 Create thread Session input stream mechanism, adding listener for events from the device

Change-Id: Ib323487f61d9e595f7ccdc1957a92e58b7002d2a
...@@ -19,7 +19,6 @@ package org.onosproject.netconf; ...@@ -19,7 +19,6 @@ package org.onosproject.netconf;
19 import org.onlab.packet.IpAddress; 19 import org.onlab.packet.IpAddress;
20 import org.onosproject.net.DeviceId; 20 import org.onosproject.net.DeviceId;
21 21
22 -import java.io.IOException;
23 import java.util.Map; 22 import java.util.Map;
24 23
25 /** 24 /**
...@@ -48,8 +47,9 @@ public interface NetconfController { ...@@ -48,8 +47,9 @@ public interface NetconfController {
48 * 47 *
49 * @param deviceInfo info about the device to add 48 * @param deviceInfo info about the device to add
50 * @return NetconfDevice Netconf device 49 * @return NetconfDevice Netconf device
50 + * @throws NetconfException when device is not available
51 */ 51 */
52 - NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws IOException; 52 + NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws NetconfException;
53 53
54 /** 54 /**
55 * Removes a Netconf device. 55 * Removes a Netconf device.
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.netconf;
18 +
19 +import org.onosproject.event.AbstractEvent;
20 +
21 +/**
22 + * Describes network configuration event.
23 + */
24 +public final class NetconfDeviceOutputEvent extends
25 + AbstractEvent<NetconfDeviceOutputEvent.Type, Object> {
26 +
27 + private final String messagePayload;
28 + private final int messageID;
29 + private final NetconfDeviceInfo deviceInfo;
30 +
31 + /**
32 + * Type of network configuration events.
33 + */
34 + public enum Type {
35 + /**
36 + * Signifies that sent a reply to a request.
37 + */
38 + DEVICE_REPLY,
39 +
40 + /**
41 + * Signifies that the device sent a notification.
42 + */
43 + DEVICE_NOTIFICATION,
44 +
45 + /**
46 + * Signifies that the device is not reachable.
47 + */
48 + DEVICE_UNREGISTERED,
49 +
50 + /**
51 + * Signifies that the device has encountered an error.
52 + */
53 + DEVICE_ERROR,
54 +
55 + }
56 +
57 + /**
58 + * Creates an event of a given type and for the specified subject and the
59 + * current time.
60 + *
61 + * @param type event type
62 + * @param subject event subject
63 + * @param payload message from the device
64 + * @param msgID id of the message related to the event
65 + * @param netconfDeviceInfo device of event
66 + */
67 + public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
68 + NetconfDeviceInfo netconfDeviceInfo) {
69 + super(type, subject);
70 + messagePayload = payload;
71 + this.messageID = msgID;
72 + deviceInfo = netconfDeviceInfo;
73 + }
74 +
75 + /**
76 + * Creates an event of a given type and for the specified subject and time.
77 + *
78 + * @param type event type
79 + * @param subject event subject
80 + * @param payload message from the device
81 + * @param msgID id of the message related to the event
82 + * @param netconfDeviceInfo device of event
83 + * @param msgID id of the message related to the event
84 + * @param time occurrence time
85 + */
86 + public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
87 + NetconfDeviceInfo netconfDeviceInfo, long time) {
88 + super(type, subject, time);
89 + messagePayload = payload;
90 + deviceInfo = netconfDeviceInfo;
91 + this.messageID = msgID;
92 + }
93 +
94 + public String getMessagePayload() {
95 + return messagePayload;
96 + }
97 +
98 + public NetconfDeviceInfo getDeviceInfo() {
99 + return deviceInfo;
100 + }
101 +
102 + public Integer getMessageID() {
103 + return messageID;
104 + }
105 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.netconf;
18 +
19 +import org.onosproject.event.EventListener;
20 +
21 +/**
22 + * Interface for Netconf device output Listeners.
23 + */
24 +public interface NetconfDeviceOutputEventListener extends EventListener<NetconfDeviceOutputEvent> {
25 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.netconf;
17 +
18 +import java.io.IOException;
19 +
20 +/**
21 + * Represents class of errors related to NETCONF SB protocol.
22 + */
23 +public class NetconfException extends IOException {
24 + /**
25 + * Constructs an exception with the specified message.
26 + *
27 + * @param message the message describing the specific nature of the error
28 + */
29 + public NetconfException(String message) {
30 + super(message);
31 + }
32 +
33 + /**
34 + * Constructs an exception with the specified message and the underlying cause.
35 + *
36 + * @param message the message describing the specific nature of the error
37 + * @param cause the underlying cause of this error
38 + */
39 + public NetconfException(String message, Throwable cause) {
40 + super(message, cause);
41 + }
42 +}
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
16 16
17 package org.onosproject.netconf; 17 package org.onosproject.netconf;
18 18
19 -import java.io.IOException;
20 import java.util.List; 19 import java.util.List;
20 +import java.util.concurrent.CompletableFuture;
21 21
22 /** 22 /**
23 * NETCONF session object that allows NETCONF operations on top with the physical 23 * NETCONF session object that allows NETCONF operations on top with the physical
...@@ -27,28 +27,45 @@ import java.util.List; ...@@ -27,28 +27,45 @@ import java.util.List;
27 public interface NetconfSession { 27 public interface NetconfSession {
28 28
29 /** 29 /**
30 + * Executes an asynchronous RPC to the server and obtains a future to be completed.
31 + *
32 + * @param request the XML containing the RPC for the server.
33 + * @return Server response or ERROR
34 + * @throws NetconfException when there is a problem in the communication process on
35 + * the underlying connection
36 + */
37 + CompletableFuture<String> request(String request) throws NetconfException;
38 +
39 +
40 + /**
30 * Retrives the requested configuration, different from get-config. 41 * Retrives the requested configuration, different from get-config.
31 * 42 *
32 * @param request the XML containing the request to the server. 43 * @param request the XML containing the request to the server.
33 * @return device running configuration 44 * @return device running configuration
45 + * @throws NetconfException when there is a problem in the communication process on
46 + * the underlying connection
34 */ 47 */
35 - String get(String request) throws IOException; 48 + String get(String request) throws NetconfException;
36 49
37 /** 50 /**
38 - * Executes an RPC to the server. 51 + * Executes an synchronous RPC to the server.
39 * 52 *
40 * @param request the XML containing the RPC for the server. 53 * @param request the XML containing the RPC for the server.
41 * @return Server response or ERROR 54 * @return Server response or ERROR
55 + * @throws NetconfException when there is a problem in the communication process on
56 + * the underlying connection
42 */ 57 */
43 - String doRPC(String request) throws IOException; 58 + String requestSync(String request) throws NetconfException;
44 59
45 /** 60 /**
46 * Retrives the specified configuration. 61 * Retrives the specified configuration.
47 * 62 *
48 * @param targetConfiguration the type of configuration to retrieve. 63 * @param targetConfiguration the type of configuration to retrieve.
49 * @return specified configuration. 64 * @return specified configuration.
65 + * @throws NetconfException when there is a problem in the communication process on
66 + * the underlying connection
50 */ 67 */
51 - String getConfig(String targetConfiguration) throws IOException; 68 + String getConfig(String targetConfiguration) throws NetconfException;
52 69
53 /** 70 /**
54 * Retrives part of the specivied configuration based on the filterSchema. 71 * Retrives part of the specivied configuration based on the filterSchema.
...@@ -57,28 +74,35 @@ public interface NetconfSession { ...@@ -57,28 +74,35 @@ public interface NetconfSession {
57 * @param configurationFilterSchema XML schema to filter the configuration 74 * @param configurationFilterSchema XML schema to filter the configuration
58 * elements we are interested in 75 * elements we are interested in
59 * @return device running configuration. 76 * @return device running configuration.
77 + * @throws NetconfException when there is a problem in the communication process on
78 + * the underlying connection
60 */ 79 */
61 String getConfig(String targetConfiguration, String configurationFilterSchema) 80 String getConfig(String targetConfiguration, String configurationFilterSchema)
62 - throws IOException; 81 + throws NetconfException;
63 82
64 /** 83 /**
65 * Retrives part of the specified configuration based on the filterSchema. 84 * Retrives part of the specified configuration based on the filterSchema.
66 * 85 *
67 * @param newConfiguration configuration to set 86 * @param newConfiguration configuration to set
68 * @return true if the configuration was edited correctly 87 * @return true if the configuration was edited correctly
88 + * @throws NetconfException when there is a problem in the communication process on
89 + * the underlying connection
69 */ 90 */
70 91
71 - boolean editConfig(String newConfiguration) throws IOException; 92 + boolean editConfig(String newConfiguration) throws NetconfException;
72 93
73 /** 94 /**
74 * Retrives part of the specified configuration based on the filterSchema. 95 * Retrives part of the specified configuration based on the filterSchema.
96 + *
75 * @param targetConfiguration the targetConfiguration to change 97 * @param targetConfiguration the targetConfiguration to change
76 - * @param mode selected mode to change the configuration 98 + * @param mode selected mode to change the configuration
77 - * @param newConfiguration configuration to set 99 + * @param newConfiguration configuration to set
78 * @return true if the configuration was edited correctly 100 * @return true if the configuration was edited correctly
101 + * @throws NetconfException when there is a problem in the communication process on
102 + * the underlying connection
79 */ 103 */
80 boolean editConfig(String targetConfiguration, String mode, String newConfiguration) 104 boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
81 - throws IOException; 105 + throws NetconfException;
82 106
83 /** 107 /**
84 * Copies the new configuration, an Url or a complete configuration xml tree 108 * Copies the new configuration, an Url or a complete configuration xml tree
...@@ -88,39 +112,49 @@ public interface NetconfSession { ...@@ -88,39 +112,49 @@ public interface NetconfSession {
88 * @param targetConfiguration the type of configuration to retrieve. 112 * @param targetConfiguration the type of configuration to retrieve.
89 * @param newConfiguration configuration to set 113 * @param newConfiguration configuration to set
90 * @return true if the configuration was copied correctly 114 * @return true if the configuration was copied correctly
115 + * @throws NetconfException when there is a problem in the communication process on
116 + * the underlying connection
91 */ 117 */
92 boolean copyConfig(String targetConfiguration, String newConfiguration) 118 boolean copyConfig(String targetConfiguration, String newConfiguration)
93 - throws IOException; 119 + throws NetconfException;
94 120
95 /** 121 /**
96 * Deletes part of the specified configuration based on the filterSchema. 122 * Deletes part of the specified configuration based on the filterSchema.
97 * 123 *
98 * @param targetConfiguration the name of the configuration to delete 124 * @param targetConfiguration the name of the configuration to delete
99 * @return true if the configuration was copied correctly 125 * @return true if the configuration was copied correctly
126 + * @throws NetconfException when there is a problem in the communication process on
127 + * the underlying connection
100 */ 128 */
101 - boolean deleteConfig(String targetConfiguration) throws IOException; 129 + boolean deleteConfig(String targetConfiguration) throws NetconfException;
102 130
103 /** 131 /**
104 * Locks the candidate configuration. 132 * Locks the candidate configuration.
105 * 133 *
106 * @return true if successful. 134 * @return true if successful.
135 + * @throws NetconfException when there is a problem in the communication process on
136 + * the underlying connection
107 */ 137 */
108 - boolean lock() throws IOException; 138 + boolean lock() throws NetconfException;
109 139
110 /** 140 /**
111 * Unlocks the candidate configuration. 141 * Unlocks the candidate configuration.
112 * 142 *
113 * @return true if successful. 143 * @return true if successful.
144 + * @throws NetconfException when there is a problem in the communication process on
145 + * the underlying connection
114 */ 146 */
115 - boolean unlock() throws IOException; 147 + boolean unlock() throws NetconfException;
116 148
117 /** 149 /**
118 * Closes the Netconf session with the device. 150 * Closes the Netconf session with the device.
119 * the first time it tries gracefully, then kills it forcefully 151 * the first time it tries gracefully, then kills it forcefully
120 * 152 *
121 * @return true if closed 153 * @return true if closed
154 + * @throws NetconfException when there is a problem in the communication process on
155 + * the underlying connection
122 */ 156 */
123 - boolean close() throws IOException; 157 + boolean close() throws NetconfException;
124 158
125 /** 159 /**
126 * Gets the session ID of the Netconf session. 160 * Gets the session ID of the Netconf session.
...@@ -137,10 +171,24 @@ public interface NetconfSession { ...@@ -137,10 +171,24 @@ public interface NetconfSession {
137 String getServerCapabilities(); 171 String getServerCapabilities();
138 172
139 /** 173 /**
140 - * Sets the device capabilities. 174 + * Sets the ONOS side capabilities.
141 * 175 *
142 * @param capabilities list of capabilities the device has. 176 * @param capabilities list of capabilities the device has.
143 */ 177 */
144 void setDeviceCapabilities(List<String> capabilities); 178 void setDeviceCapabilities(List<String> capabilities);
145 179
180 + /**
181 + * Remove a listener from the underlying stream handler implementation.
182 + *
183 + * @param listener event listener.
184 + */
185 + void addDeviceOutputListener(NetconfDeviceOutputEventListener listener);
186 +
187 + /**
188 + * Remove a listener from the underlying stream handler implementation.
189 + *
190 + * @param listener event listener.
191 + */
192 + void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener);
193 +
146 } 194 }
......
...@@ -26,11 +26,11 @@ import org.onosproject.netconf.NetconfController; ...@@ -26,11 +26,11 @@ import org.onosproject.netconf.NetconfController;
26 import org.onosproject.netconf.NetconfDevice; 26 import org.onosproject.netconf.NetconfDevice;
27 import org.onosproject.netconf.NetconfDeviceInfo; 27 import org.onosproject.netconf.NetconfDeviceInfo;
28 import org.onosproject.netconf.NetconfDeviceListener; 28 import org.onosproject.netconf.NetconfDeviceListener;
29 +import org.onosproject.netconf.NetconfException;
29 import org.osgi.service.component.ComponentContext; 30 import org.osgi.service.component.ComponentContext;
30 import org.slf4j.Logger; 31 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory; 32 import org.slf4j.LoggerFactory;
32 33
33 -import java.io.IOException;
34 import java.util.Map; 34 import java.util.Map;
35 import java.util.Set; 35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.ConcurrentHashMap;
...@@ -90,9 +90,9 @@ public class NetconfControllerImpl implements NetconfController { ...@@ -90,9 +90,9 @@ public class NetconfControllerImpl implements NetconfController {
90 } 90 }
91 91
92 @Override 92 @Override
93 - public NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws IOException { 93 + public NetconfDevice connectDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
94 if (netconfDeviceMap.containsKey(deviceInfo.getDeviceId())) { 94 if (netconfDeviceMap.containsKey(deviceInfo.getDeviceId())) {
95 - log.warn("Device {} is already present", deviceInfo); 95 + log.info("Device {} is already present", deviceInfo);
96 return netconfDeviceMap.get(deviceInfo.getDeviceId()); 96 return netconfDeviceMap.get(deviceInfo.getDeviceId());
97 } else { 97 } else {
98 log.info("Creating NETCONF device {}", deviceInfo); 98 log.info("Creating NETCONF device {}", deviceInfo);
...@@ -109,9 +109,8 @@ public class NetconfControllerImpl implements NetconfController { ...@@ -109,9 +109,8 @@ public class NetconfControllerImpl implements NetconfController {
109 } 109 }
110 } 110 }
111 111
112 - private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws IOException { 112 + private NetconfDevice createDevice(NetconfDeviceInfo deviceInfo) throws NetconfException {
113 - NetconfDevice netconfDevice = null; 113 + NetconfDevice netconfDevice = new NetconfDeviceImpl(deviceInfo);
114 - netconfDevice = new NetconfDeviceImpl(deviceInfo);
115 for (NetconfDeviceListener l : netconfDeviceListeners) { 114 for (NetconfDeviceListener l : netconfDeviceListeners) {
116 l.deviceAdded(deviceInfo); 115 l.deviceAdded(deviceInfo);
117 } 116 }
......
...@@ -18,6 +18,7 @@ package org.onosproject.netconf.ctl; ...@@ -18,6 +18,7 @@ package org.onosproject.netconf.ctl;
18 18
19 import org.onosproject.netconf.NetconfDevice; 19 import org.onosproject.netconf.NetconfDevice;
20 import org.onosproject.netconf.NetconfDeviceInfo; 20 import org.onosproject.netconf.NetconfDeviceInfo;
21 +import org.onosproject.netconf.NetconfException;
21 import org.onosproject.netconf.NetconfSession; 22 import org.onosproject.netconf.NetconfSession;
22 import org.slf4j.Logger; 23 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory; 24 import org.slf4j.LoggerFactory;
...@@ -36,12 +37,13 @@ public class NetconfDeviceImpl implements NetconfDevice { ...@@ -36,12 +37,13 @@ public class NetconfDeviceImpl implements NetconfDevice {
36 private boolean deviceState = false; 37 private boolean deviceState = false;
37 private NetconfSession netconfSession; 38 private NetconfSession netconfSession;
38 39
39 - public NetconfDeviceImpl(NetconfDeviceInfo deviceInfo) throws IOException { 40 + public NetconfDeviceImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
40 netconfDeviceInfo = deviceInfo; 41 netconfDeviceInfo = deviceInfo;
41 try { 42 try {
42 netconfSession = new NetconfSessionImpl(netconfDeviceInfo); 43 netconfSession = new NetconfSessionImpl(netconfDeviceInfo);
43 } catch (IOException e) { 44 } catch (IOException e) {
44 - throw new IOException("Cannot create connection and session", e); 45 + throw new NetconfException("Cannot create connection and session for device " +
46 + deviceInfo, e);
45 } 47 }
46 deviceState = true; 48 deviceState = true;
47 } 49 }
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.netconf.ctl;
18 +
19 +import org.onosproject.netconf.NetconfDeviceInfo;
20 +import org.onosproject.netconf.NetconfDeviceOutputEvent;
21 +import org.onosproject.netconf.NetconfDeviceOutputEventListener;
22 +import org.slf4j.Logger;
23 +import org.slf4j.LoggerFactory;
24 +
25 +/**
26 + * Example of a listener for events that happen a Netconf session established
27 + * for a particular NETCONF device.
28 + */
29 +public class NetconfDeviceOutputEventListenerImpl implements NetconfDeviceOutputEventListener {
30 +
31 + private static final Logger log =
32 + LoggerFactory.getLogger(NetconfDeviceOutputEventListenerImpl.class);
33 +
34 + private NetconfDeviceInfo deviceInfo;
35 +
36 + public NetconfDeviceOutputEventListenerImpl(NetconfDeviceInfo deviceInfo) {
37 + this.deviceInfo = deviceInfo;
38 + }
39 +
40 + @Override
41 + public void event(NetconfDeviceOutputEvent event) {
42 + switch (event.type()) {
43 + case DEVICE_REPLY:
44 + log.debug("Device {} has reply: {}", deviceInfo, event.getMessagePayload());
45 + break;
46 + case DEVICE_NOTIFICATION:
47 + log.info("Device {} has notification: {}", deviceInfo, event.getMessagePayload());
48 + break;
49 + case DEVICE_UNREGISTERED:
50 + log.warn("Device {} has closed session", deviceInfo);
51 + //TODO tell onos about closed session
52 + break;
53 + case DEVICE_ERROR:
54 + log.warn("Device {} has error: {}", deviceInfo, event.getMessagePayload());
55 + break;
56 + default:
57 + log.warn("Wrong event type {} ", event.type());
58 + }
59 +
60 + }
61 +
62 + @Override
63 + public boolean isRelevant(NetconfDeviceOutputEvent event) {
64 + return deviceInfo.equals(event.getDeviceInfo());
65 + }
66 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.netconf.ctl;
18 +
19 +import org.onosproject.netconf.NetconfDeviceOutputEvent;
20 +
21 +/**
22 + * Entity associated with a NetconfSessionImpl and capable of receiving notifications of
23 + * events about the session.
24 + */
25 +public interface NetconfSessionDelegate {
26 +
27 + /**
28 + * Notifies the delegate via the specified event.
29 + *
30 + * @param event store generated event
31 + */
32 + void notify(NetconfDeviceOutputEvent event);
33 +}
...@@ -18,56 +18,70 @@ package org.onosproject.netconf.ctl; ...@@ -18,56 +18,70 @@ package org.onosproject.netconf.ctl;
18 18
19 import ch.ethz.ssh2.Connection; 19 import ch.ethz.ssh2.Connection;
20 import ch.ethz.ssh2.Session; 20 import ch.ethz.ssh2.Session;
21 -import ch.ethz.ssh2.StreamGobbler;
22 import com.google.common.base.Preconditions; 21 import com.google.common.base.Preconditions;
23 import org.onosproject.netconf.NetconfDeviceInfo; 22 import org.onosproject.netconf.NetconfDeviceInfo;
23 +import org.onosproject.netconf.NetconfDeviceOutputEvent;
24 +import org.onosproject.netconf.NetconfDeviceOutputEventListener;
25 +import org.onosproject.netconf.NetconfException;
24 import org.onosproject.netconf.NetconfSession; 26 import org.onosproject.netconf.NetconfSession;
25 import org.slf4j.Logger; 27 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory; 28 import org.slf4j.LoggerFactory;
27 29
28 -import java.io.BufferedReader;
29 import java.io.IOException; 30 import java.io.IOException;
30 -import java.io.InputStreamReader;
31 import java.io.PrintWriter; 31 import java.io.PrintWriter;
32 -import java.io.StringWriter;
33 import java.util.Collections; 32 import java.util.Collections;
33 +import java.util.HashMap;
34 import java.util.List; 34 import java.util.List;
35 +import java.util.Map;
36 +import java.util.concurrent.CompletableFuture;
37 +import java.util.concurrent.atomic.AtomicInteger;
38 +
35 39
36 /** 40 /**
37 * Implementation of a NETCONF session to talk to a device. 41 * Implementation of a NETCONF session to talk to a device.
38 */ 42 */
39 public class NetconfSessionImpl implements NetconfSession { 43 public class NetconfSessionImpl implements NetconfSession {
40 44
41 - public static final Logger log = LoggerFactory 45 + private static final Logger log = LoggerFactory
42 .getLogger(NetconfSessionImpl.class); 46 .getLogger(NetconfSessionImpl.class);
47 +
48 +
43 private static final int CONNECTION_TIMEOUT = 0; 49 private static final int CONNECTION_TIMEOUT = 0;
50 + private static final String ENDPATTERN = "]]>]]>";
51 + private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
52 + private static final String MESSAGE_ID_STRING = "message-id";
53 + private static final String HELLO = "hello";
54 + private static final String NEW_LINE = "\n";
44 55
45 56
46 private Connection netconfConnection; 57 private Connection netconfConnection;
47 private NetconfDeviceInfo deviceInfo; 58 private NetconfDeviceInfo deviceInfo;
48 private Session sshSession; 59 private Session sshSession;
49 private boolean connectionActive; 60 private boolean connectionActive;
50 - private BufferedReader bufferReader = null;
51 private PrintWriter out = null; 61 private PrintWriter out = null;
52 - private int messageID = 0;
53 - //TODO inject these capabilites from yang model provided by app
54 private List<String> deviceCapabilities = 62 private List<String> deviceCapabilities =
55 Collections.singletonList("urn:ietf:params:netconf:base:1.0"); 63 Collections.singletonList("urn:ietf:params:netconf:base:1.0");
56 private String serverCapabilities; 64 private String serverCapabilities;
57 - private String endpattern = "]]>]]>"; 65 + private NetconfStreamHandler t;
66 + private Map<Integer, CompletableFuture<String>> replies;
58 67
59 68
60 - public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws IOException { 69 + public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
61 this.deviceInfo = deviceInfo; 70 this.deviceInfo = deviceInfo;
62 connectionActive = false; 71 connectionActive = false;
72 + replies = new HashMap<>();
63 startConnection(); 73 startConnection();
64 } 74 }
65 75
66 76
67 - private void startConnection() throws IOException { 77 + private void startConnection() throws NetconfException {
68 if (!connectionActive) { 78 if (!connectionActive) {
69 netconfConnection = new Connection(deviceInfo.ip().toString(), deviceInfo.port()); 79 netconfConnection = new Connection(deviceInfo.ip().toString(), deviceInfo.port());
70 - netconfConnection.connect(null, CONNECTION_TIMEOUT, 0); 80 + try {
81 + netconfConnection.connect(null, CONNECTION_TIMEOUT, 5000);
82 + } catch (IOException e) {
83 + throw new NetconfException("Cannot open a connection with device" + deviceInfo, e);
84 + }
71 boolean isAuthenticated; 85 boolean isAuthenticated;
72 try { 86 try {
73 if (deviceInfo.getKeyFile() != null) { 87 if (deviceInfo.getKeyFile() != null) {
...@@ -75,39 +89,49 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -75,39 +89,49 @@ public class NetconfSessionImpl implements NetconfSession {
75 deviceInfo.name(), deviceInfo.getKeyFile(), 89 deviceInfo.name(), deviceInfo.getKeyFile(),
76 deviceInfo.password()); 90 deviceInfo.password());
77 } else { 91 } else {
78 - log.info("authenticate with username {} and password {}", 92 + log.debug("Authenticating to device {} with username {}",
79 - deviceInfo.name(), deviceInfo.password()); 93 + deviceInfo.getDeviceId(), deviceInfo.name(), deviceInfo.password());
80 isAuthenticated = netconfConnection.authenticateWithPassword( 94 isAuthenticated = netconfConnection.authenticateWithPassword(
81 deviceInfo.name(), deviceInfo.password()); 95 deviceInfo.name(), deviceInfo.password());
82 } 96 }
83 } catch (IOException e) { 97 } catch (IOException e) {
84 - throw new IOException("Authentication connection failed:" + 98 + log.error("Authentication connection to device " +
85 - e.getMessage()); 99 + deviceInfo.getDeviceId() + " failed:" +
100 + e.getMessage());
101 + throw new NetconfException("Authentication connection to device " +
102 + deviceInfo.getDeviceId() + " failed", e);
86 } 103 }
87 104
88 connectionActive = true; 105 connectionActive = true;
89 Preconditions.checkArgument(isAuthenticated, 106 Preconditions.checkArgument(isAuthenticated,
90 - "Authentication password and username failed"); 107 + "Authentication to device {} with username " +
108 + "{} Failed",
109 + deviceInfo.getDeviceId(), deviceInfo.name(),
110 + deviceInfo.password());
91 startSshSession(); 111 startSshSession();
92 } 112 }
93 } 113 }
94 114
95 - private void startSshSession() throws IOException { 115 + private void startSshSession() throws NetconfException {
96 try { 116 try {
97 sshSession = netconfConnection.openSession(); 117 sshSession = netconfConnection.openSession();
98 sshSession.startSubSystem("netconf"); 118 sshSession.startSubSystem("netconf");
99 - bufferReader = new BufferedReader(new InputStreamReader(new StreamGobbler(
100 - sshSession.getStdout())));
101 out = new PrintWriter(sshSession.getStdin()); 119 out = new PrintWriter(sshSession.getStdin());
120 + t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
121 + sshSession.getStderr(), deviceInfo,
122 + new NetconfSessionDelegateImpl());
123 + this.addDeviceOutputListener(new NetconfDeviceOutputEventListenerImpl(deviceInfo));
102 sendHello(); 124 sendHello();
103 } catch (IOException e) { 125 } catch (IOException e) {
104 - throw new IOException("Failed to create ch.ethz.ssh2.Session session:" + 126 + log.error("Failed to create ch.ethz.ssh2.Session session:" +
105 - e.getMessage()); 127 + e.getMessage());
128 + throw new NetconfException("Failed to create ch.ethz.ssh2.Session session with device" +
129 + deviceInfo, e);
106 } 130 }
107 } 131 }
108 132
109 private void sendHello() throws IOException { 133 private void sendHello() throws IOException {
110 - serverCapabilities = doRequest(createHelloString()); 134 + serverCapabilities = sendRequest(createHelloString());
111 } 135 }
112 136
113 private String createHelloString() { 137 private String createHelloString() {
...@@ -119,58 +143,68 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -119,58 +143,68 @@ public class NetconfSessionImpl implements NetconfSession {
119 cap -> hellobuffer.append(" <capability>" + cap + "</capability>\n")); 143 cap -> hellobuffer.append(" <capability>" + cap + "</capability>\n"));
120 hellobuffer.append(" </capabilities>\n"); 144 hellobuffer.append(" </capabilities>\n");
121 hellobuffer.append("</hello>\n"); 145 hellobuffer.append("</hello>\n");
122 - hellobuffer.append(endpattern); 146 + hellobuffer.append(ENDPATTERN);
123 return hellobuffer.toString(); 147 return hellobuffer.toString();
124 148
125 } 149 }
126 150
127 - @Override 151 + private void checkAndRestablishSession() throws NetconfException {
128 - public String doRPC(String request) throws IOException {
129 - String reply = doRequest(request + "\n" + endpattern);
130 - return checkReply(reply) ? reply : "ERROR " + reply;
131 - }
132 -
133 - private String doRequest(String request) throws IOException {
134 - //log.info("sshState " + sshSession.getState() + "request" + request);
135 - checkAndRestablishSession();
136 - //log.info("sshState after" + sshSession.getState());
137 - out.print(request);
138 - out.flush();
139 - messageID++;
140 - return readOne();
141 - }
142 -
143 - private void checkAndRestablishSession() throws IOException {
144 if (sshSession.getState() != 2) { 152 if (sshSession.getState() != 2) {
145 try { 153 try {
146 startSshSession(); 154 startSshSession();
147 } catch (IOException e) { 155 } catch (IOException e) {
148 - log.info("the connection had to be reopened"); 156 + log.debug("The connection with {} had to be reopened", deviceInfo.getDeviceId());
149 try { 157 try {
150 startConnection(); 158 startConnection();
151 } catch (IOException e2) { 159 } catch (IOException e2) {
152 log.error("No connection {} for device, exception {}", netconfConnection, e2); 160 log.error("No connection {} for device, exception {}", netconfConnection, e2);
153 - throw new IOException(e.getMessage()); 161 + throw new NetconfException("Cannot re-open the connection with device" + deviceInfo, e);
154 - //TODO remove device from ONOS
155 } 162 }
156 } 163 }
157 } 164 }
158 } 165 }
159 166
160 @Override 167 @Override
161 - public String get(String request) throws IOException { 168 + public String requestSync(String request) throws NetconfException {
162 - return doRPC(request); 169 + String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
170 + return checkReply(reply) ? reply : "ERROR " + reply;
163 } 171 }
164 172
165 @Override 173 @Override
166 - public String getConfig(String targetConfiguration) throws IOException { 174 + public CompletableFuture<String> request(String request) {
175 + CompletableFuture<String> ftrep = t.sendMessage(request);
176 + replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
177 + return ftrep;
178 + }
179 +
180 + private String sendRequest(String request) throws NetconfException {
181 + checkAndRestablishSession();
182 + //FIXME find out a better way to enforce the presence of message-id
183 + if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
184 + request = request.replaceFirst("\">", "\" message-id=\""
185 + + MESSAGE_ID_INTEGER.get() + "\"" + ">");
186 + }
187 + CompletableFuture<String> futureReply = request(request);
188 + MESSAGE_ID_INTEGER.incrementAndGet();
189 + String rp = futureReply.join();
190 + log.debug("Reply from device {}", rp);
191 + return rp;
192 + }
193 +
194 + @Override
195 + public String get(String request) throws NetconfException {
196 + return requestSync(request);
197 + }
198 +
199 + @Override
200 + public String getConfig(String targetConfiguration) throws NetconfException {
167 return getConfig(targetConfiguration, null); 201 return getConfig(targetConfiguration, null);
168 } 202 }
169 203
170 @Override 204 @Override
171 - public String getConfig(String targetConfiguration, String configurationSchema) throws IOException { 205 + public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
172 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"); 206 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
173 - rpc.append("<rpc message-id=\"" + messageID + "\" " 207 + rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\" "
174 + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"); 208 + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
175 rpc.append("<get-config>\n"); 209 rpc.append("<get-config>\n");
176 rpc.append("<source>\n"); 210 rpc.append("<source>\n");
...@@ -183,23 +217,23 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -183,23 +217,23 @@ public class NetconfSessionImpl implements NetconfSession {
183 } 217 }
184 rpc.append("</get-config>\n"); 218 rpc.append("</get-config>\n");
185 rpc.append("</rpc>\n"); 219 rpc.append("</rpc>\n");
186 - rpc.append(endpattern); 220 + rpc.append(ENDPATTERN);
187 - String reply = doRequest(rpc.toString()); 221 + String reply = sendRequest(rpc.toString());
188 return checkReply(reply) ? reply : "ERROR " + reply; 222 return checkReply(reply) ? reply : "ERROR " + reply;
189 } 223 }
190 224
191 @Override 225 @Override
192 - public boolean editConfig(String newConfiguration) throws IOException { 226 + public boolean editConfig(String newConfiguration) throws NetconfException {
193 - newConfiguration = newConfiguration + endpattern; 227 + newConfiguration = newConfiguration + ENDPATTERN;
194 - return checkReply(doRequest(newConfiguration)); 228 + return checkReply(sendRequest(newConfiguration));
195 } 229 }
196 230
197 @Override 231 @Override
198 public boolean editConfig(String targetConfiguration, String mode, String newConfiguration) 232 public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
199 - throws IOException { 233 + throws NetconfException {
200 newConfiguration = newConfiguration.trim(); 234 newConfiguration = newConfiguration.trim();
201 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"); 235 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
202 - rpc.append("<rpc message-id=\"" + messageID + "\" " 236 + rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\" "
203 + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"); 237 + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
204 rpc.append("<edit-config>"); 238 rpc.append("<edit-config>");
205 rpc.append("<target>"); 239 rpc.append("<target>");
...@@ -213,13 +247,13 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -213,13 +247,13 @@ public class NetconfSessionImpl implements NetconfSession {
213 rpc.append("</config>"); 247 rpc.append("</config>");
214 rpc.append("</edit-config>"); 248 rpc.append("</edit-config>");
215 rpc.append("</rpc>"); 249 rpc.append("</rpc>");
216 - rpc.append(endpattern); 250 + rpc.append(ENDPATTERN);
217 - return checkReply(doRequest(rpc.toString())); 251 + return checkReply(sendRequest(rpc.toString()));
218 } 252 }
219 253
220 @Override 254 @Override
221 public boolean copyConfig(String targetConfiguration, String newConfiguration) 255 public boolean copyConfig(String targetConfiguration, String newConfiguration)
222 - throws IOException { 256 + throws NetconfException {
223 newConfiguration = newConfiguration.trim(); 257 newConfiguration = newConfiguration.trim();
224 if (!newConfiguration.startsWith("<configuration>")) { 258 if (!newConfiguration.startsWith("<configuration>")) {
225 newConfiguration = "<configuration>" + newConfiguration 259 newConfiguration = "<configuration>" + newConfiguration
...@@ -237,12 +271,12 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -237,12 +271,12 @@ public class NetconfSessionImpl implements NetconfSession {
237 rpc.append("</source>"); 271 rpc.append("</source>");
238 rpc.append("</copy-config>"); 272 rpc.append("</copy-config>");
239 rpc.append("</rpc>"); 273 rpc.append("</rpc>");
240 - rpc.append(endpattern); 274 + rpc.append(ENDPATTERN);
241 - return checkReply(doRequest(rpc.toString())); 275 + return checkReply(sendRequest(rpc.toString()));
242 } 276 }
243 277
244 @Override 278 @Override
245 - public boolean deleteConfig(String targetConfiguration) throws IOException { 279 + public boolean deleteConfig(String targetConfiguration) throws NetconfException {
246 if (targetConfiguration.equals("running")) { 280 if (targetConfiguration.equals("running")) {
247 log.warn("Target configuration for delete operation can't be \"running\"", 281 log.warn("Target configuration for delete operation can't be \"running\"",
248 targetConfiguration); 282 targetConfiguration);
...@@ -257,12 +291,12 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -257,12 +291,12 @@ public class NetconfSessionImpl implements NetconfSession {
257 rpc.append("</target>"); 291 rpc.append("</target>");
258 rpc.append("</delete-config>"); 292 rpc.append("</delete-config>");
259 rpc.append("</rpc>"); 293 rpc.append("</rpc>");
260 - rpc.append(endpattern); 294 + rpc.append(ENDPATTERN);
261 - return checkReply(doRequest(rpc.toString())); 295 + return checkReply(sendRequest(rpc.toString()));
262 } 296 }
263 297
264 @Override 298 @Override
265 - public boolean lock() throws IOException { 299 + public boolean lock() throws NetconfException {
266 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " + 300 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
267 "encoding=\"UTF-8\"?>"); 301 "encoding=\"UTF-8\"?>");
268 rpc.append("<rpc>"); 302 rpc.append("<rpc>");
...@@ -272,12 +306,12 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -272,12 +306,12 @@ public class NetconfSessionImpl implements NetconfSession {
272 rpc.append("</target>"); 306 rpc.append("</target>");
273 rpc.append("</lock>"); 307 rpc.append("</lock>");
274 rpc.append("</rpc>"); 308 rpc.append("</rpc>");
275 - rpc.append(endpattern); 309 + rpc.append(ENDPATTERN);
276 - return checkReply(doRequest(rpc.toString())); 310 + return checkReply(sendRequest(rpc.toString()));
277 } 311 }
278 312
279 @Override 313 @Override
280 - public boolean unlock() throws IOException { 314 + public boolean unlock() throws NetconfException {
281 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " + 315 StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
282 "encoding=\"UTF-8\"?>"); 316 "encoding=\"UTF-8\"?>");
283 rpc.append("<rpc>"); 317 rpc.append("<rpc>");
...@@ -287,16 +321,16 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -287,16 +321,16 @@ public class NetconfSessionImpl implements NetconfSession {
287 rpc.append("</target>"); 321 rpc.append("</target>");
288 rpc.append("</unlock>"); 322 rpc.append("</unlock>");
289 rpc.append("</rpc>"); 323 rpc.append("</rpc>");
290 - rpc.append(endpattern); 324 + rpc.append(ENDPATTERN);
291 - return checkReply(doRequest(rpc.toString())); 325 + return checkReply(sendRequest(rpc.toString()));
292 } 326 }
293 327
294 @Override 328 @Override
295 - public boolean close() throws IOException { 329 + public boolean close() throws NetconfException {
296 return close(false); 330 return close(false);
297 } 331 }
298 332
299 - private boolean close(boolean force) throws IOException { 333 + private boolean close(boolean force) throws NetconfException {
300 StringBuilder rpc = new StringBuilder(); 334 StringBuilder rpc = new StringBuilder();
301 rpc.append("<rpc>"); 335 rpc.append("<rpc>");
302 if (force) { 336 if (force) {
...@@ -306,8 +340,8 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -306,8 +340,8 @@ public class NetconfSessionImpl implements NetconfSession {
306 } 340 }
307 rpc.append("<close-configuration/>"); 341 rpc.append("<close-configuration/>");
308 rpc.append("</rpc>"); 342 rpc.append("</rpc>");
309 - rpc.append(endpattern); 343 + rpc.append(ENDPATTERN);
310 - return checkReply(doRequest(rpc.toString())) || close(true); 344 + return checkReply(sendRequest(rpc.toString())) || close(true);
311 } 345 }
312 346
313 @Override 347 @Override
...@@ -335,7 +369,17 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -335,7 +369,17 @@ public class NetconfSessionImpl implements NetconfSession {
335 deviceCapabilities = capabilities; 369 deviceCapabilities = capabilities;
336 } 370 }
337 371
338 - private boolean checkReply(String reply) { 372 + @Override
373 + public void addDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
374 + t.addDeviceEventListener(listener);
375 + }
376 +
377 + @Override
378 + public void removeDeviceOutputListener(NetconfDeviceOutputEventListener listener) {
379 + t.removeDeviceEventListener(listener);
380 + }
381 +
382 + private boolean checkReply(String reply) throws NetconfException {
339 if (reply != null) { 383 if (reply != null) {
340 if (!reply.contains("<rpc-error>")) { 384 if (!reply.contains("<rpc-error>")) {
341 return true; 385 return true;
...@@ -345,36 +389,18 @@ public class NetconfSessionImpl implements NetconfSession { ...@@ -345,36 +389,18 @@ public class NetconfSessionImpl implements NetconfSession {
345 return true; 389 return true;
346 } 390 }
347 } 391 }
348 - log.warn("Error in reply {}", reply); 392 + log.warn("Device " + deviceInfo + "has error in reply {}", reply);
349 return false; 393 return false;
350 } 394 }
351 395
352 - private String readOne() throws IOException { 396 + public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
353 - //TODO try a simple string
354 - final StringWriter reply = new StringWriter();
355 - while (true) {
356 - int charRead = bufferReader.read();
357 - if (charRead == -1) {
358 - throw new IOException("Session closed");
359 - }
360 397
361 - for (int i = 0; i < endpattern.length(); i++) { 398 + @Override
362 - if (charRead == endpattern.charAt(i)) { 399 + public void notify(NetconfDeviceOutputEvent event) {
363 - if (i < endpattern.length() - 1) { 400 + CompletableFuture<String> completedReply = replies.get(event.getMessageID());
364 - charRead = bufferReader.read(); 401 + completedReply.complete(event.getMessagePayload());
365 - } else {
366 - return reply.getBuffer().toString();
367 - }
368 - } else {
369 - String s = endpattern.substring(0, i);
370 - for (int j = 0; i < s.length(); j++) {
371 - reply.write(s.charAt(j));
372 - }
373 - reply.write(charRead);
374 - break;
375 - }
376 - }
377 } 402 }
378 } 403 }
379 404
405 +
380 } 406 }
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.netconf.ctl;
18 +
19 +import org.onosproject.netconf.NetconfDeviceOutputEventListener;
20 +
21 +import java.util.concurrent.CompletableFuture;
22 +
23 +/**
24 + * Interface to represent an objects that does all the IO on a NETCONF session
25 + * with a device.
26 + */
27 +public interface NetconfStreamHandler {
28 + /**
29 + * Sends the request on the stream that is used to communicate to and from the device.
30 + *
31 + * @param request request to send to the physical device
32 + * @return a CompletableFuture of type String that will contain the response for the request.
33 + */
34 + CompletableFuture<String> sendMessage(String request);
35 +
36 + /**
37 + * Adds a listener for netconf events on the handled stream.
38 + *
39 + * @param listener Netconf device event listener
40 + */
41 + void addDeviceEventListener(NetconfDeviceOutputEventListener listener);
42 +
43 + /**
44 + * Removes a listener for netconf events on the handled stream.
45 + *
46 + * @param listener Netconf device event listener
47 + */
48 + void removeDeviceEventListener(NetconfDeviceOutputEventListener listener);
49 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.netconf.ctl;
18 +
19 +import com.google.common.base.Preconditions;
20 +import com.google.common.collect.Lists;
21 +import org.onosproject.netconf.NetconfDeviceInfo;
22 +import org.onosproject.netconf.NetconfDeviceOutputEvent;
23 +import org.onosproject.netconf.NetconfDeviceOutputEventListener;
24 +import org.onosproject.netconf.NetconfException;
25 +import org.slf4j.Logger;
26 +import org.slf4j.LoggerFactory;
27 +
28 +import java.io.BufferedReader;
29 +import java.io.IOException;
30 +import java.io.InputStream;
31 +import java.io.InputStreamReader;
32 +import java.io.OutputStream;
33 +import java.io.PrintWriter;
34 +import java.util.List;
35 +import java.util.concurrent.CompletableFuture;
36 +
37 +/**
38 + * Thread that gets spawned each time a session is established and handles all the input
39 + * and output from the session's streams to and from the NETCONF device the session is
40 + * established with.
41 + */
42 +public class NetconfStreamThread extends Thread implements NetconfStreamHandler {
43 +
44 + private static final Logger log = LoggerFactory
45 + .getLogger(NetconfStreamThread.class);
46 + private static final String HELLO = "hello";
47 + private static final String END_PATTERN = "]]>]]>";
48 + private static final String RPC_REPLY = "rpc-reply";
49 + private static final String RPC_ERROR = "rpc-error";
50 + private static final String NOTIFICATION_LABEL = "<notification>";
51 +
52 + private static PrintWriter outputStream;
53 + private static NetconfDeviceInfo netconfDeviceInfo;
54 + private static NetconfSessionDelegate sessionDelegate;
55 + private static NetconfMessageState state;
56 + private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
57 + = Lists.newArrayList();
58 +
59 + public NetconfStreamThread(final InputStream in, final OutputStream out,
60 + final InputStream err, NetconfDeviceInfo deviceInfo,
61 + NetconfSessionDelegate delegate) {
62 + super(handler(in, err));
63 + outputStream = new PrintWriter(out);
64 + netconfDeviceInfo = deviceInfo;
65 + state = NetconfMessageState.NO_MATCHING_PATTERN;
66 + sessionDelegate = delegate;
67 + log.debug("Stream thread for device {} session started", deviceInfo);
68 + start();
69 + }
70 +
71 + @Override
72 + public CompletableFuture<String> sendMessage(String request) {
73 + outputStream.print(request);
74 + outputStream.flush();
75 + return new CompletableFuture<>();
76 + }
77 +
78 + public enum NetconfMessageState {
79 +
80 + NO_MATCHING_PATTERN {
81 + @Override
82 + NetconfMessageState evaluateChar(char c) {
83 + if (c == ']') {
84 + return FIRST_BRAKET;
85 + } else {
86 + return this;
87 + }
88 + }
89 + },
90 + FIRST_BRAKET {
91 + @Override
92 + NetconfMessageState evaluateChar(char c) {
93 + if (c == ']') {
94 + return SECOND_BRAKET;
95 + } else {
96 + return NO_MATCHING_PATTERN;
97 + }
98 + }
99 + },
100 + SECOND_BRAKET {
101 + @Override
102 + NetconfMessageState evaluateChar(char c) {
103 + if (c == '>') {
104 + return FIRST_BIGGER;
105 + } else {
106 + return NO_MATCHING_PATTERN;
107 + }
108 + }
109 + },
110 + FIRST_BIGGER {
111 + @Override
112 + NetconfMessageState evaluateChar(char c) {
113 + if (c == ']') {
114 + return THIRD_BRAKET;
115 + } else {
116 + return NO_MATCHING_PATTERN;
117 + }
118 + }
119 + },
120 + THIRD_BRAKET {
121 + @Override
122 + NetconfMessageState evaluateChar(char c) {
123 + if (c == ']') {
124 + return ENDING_BIGGER;
125 + } else {
126 + return NO_MATCHING_PATTERN;
127 + }
128 + }
129 + },
130 + ENDING_BIGGER {
131 + @Override
132 + NetconfMessageState evaluateChar(char c) {
133 + if (c == '>') {
134 + return END_PATTERN;
135 + } else {
136 + return NO_MATCHING_PATTERN;
137 + }
138 + }
139 + },
140 + END_PATTERN {
141 + @Override
142 + NetconfMessageState evaluateChar(char c) {
143 + return NO_MATCHING_PATTERN;
144 + }
145 + };
146 +
147 + abstract NetconfMessageState evaluateChar(char c);
148 + }
149 +
150 + private static Runnable handler(final InputStream in, final InputStream err) {
151 + BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
152 + return () -> {
153 + try {
154 + boolean socketClosed = false;
155 + StringBuilder deviceReplyBuilder = new StringBuilder();
156 + while (!socketClosed) {
157 + int cInt = bufferReader.read();
158 + if (cInt == -1) {
159 + socketClosed = true;
160 + NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
161 + NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
162 + null, null, -1, netconfDeviceInfo);
163 + netconfDeviceEventListeners.forEach(
164 + listener -> listener.event(event));
165 + }
166 + char c = (char) cInt;
167 + state = state.evaluateChar(c);
168 + deviceReplyBuilder.append(c);
169 + if (state == NetconfMessageState.END_PATTERN) {
170 + String deviceReply = deviceReplyBuilder.toString()
171 + .replace(END_PATTERN, "");
172 + if (deviceReply.contains(RPC_REPLY) ||
173 + deviceReply.contains(RPC_ERROR) ||
174 + deviceReply.contains(HELLO)) {
175 + NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
176 + NetconfDeviceOutputEvent.Type.DEVICE_REPLY,
177 + null, deviceReply, getMsgId(deviceReply), netconfDeviceInfo);
178 + sessionDelegate.notify(event);
179 + netconfDeviceEventListeners.forEach(
180 + listener -> listener.event(event));
181 + } else if (deviceReply.contains(NOTIFICATION_LABEL)) {
182 + final String finalDeviceReply = deviceReply;
183 + netconfDeviceEventListeners.forEach(
184 + listener -> listener.event(new NetconfDeviceOutputEvent(
185 + NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
186 + null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
187 + } else {
188 + log.info("Error on replay from device {} ", deviceReply);
189 + }
190 + deviceReplyBuilder.setLength(0);
191 + }
192 + }
193 + } catch (IOException e) {
194 + log.warn("Error in reading from the session for device " + netconfDeviceInfo, e);
195 + throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
196 + netconfDeviceInfo, e));
197 + //TODO should we send a socket closed message to listeners ?
198 + }
199 + };
200 + }
201 +
202 + private static int getMsgId(String reply) {
203 + if (!reply.contains(HELLO)) {
204 + String[] outer = reply.split("message-id=");
205 + Preconditions.checkArgument(outer.length != 1,
206 + "Error in retrieving the message id");
207 + String messageID = outer[1].substring(0, 3).replace("\"", "");
208 + Preconditions.checkNotNull(Integer.parseInt(messageID),
209 + "Error in retrieving the message id");
210 + return Integer.parseInt(messageID);
211 + } else {
212 + return 0;
213 + }
214 + }
215 +
216 + public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
217 + if (!netconfDeviceEventListeners.contains(listener)) {
218 + netconfDeviceEventListeners.add(listener);
219 + }
220 + }
221 +
222 + public void removeDeviceEventListener(NetconfDeviceOutputEventListener listener) {
223 + netconfDeviceEventListeners.remove(listener);
224 + }
225 +}
...@@ -65,7 +65,7 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -65,7 +65,7 @@ public class NetconfDeviceProvider extends AbstractProvider
65 protected DeviceProviderRegistry providerRegistry; 65 protected DeviceProviderRegistry providerRegistry;
66 66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 - protected NetconfController controller; //where is initiated ? 68 + protected NetconfController controller;
69 69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected NetworkConfigRegistry cfgService; 71 protected NetworkConfigRegistry cfgService;
...@@ -73,11 +73,13 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -73,11 +73,13 @@ public class NetconfDeviceProvider extends AbstractProvider
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected CoreService coreService; 74 protected CoreService coreService;
75 75
76 + private static final String APP_NAME = "org.onosproject.netconf";
77 + private static final String SCHEME_NAME = "netconf";
78 + private static final String DEVICE_PROVIDER_PACKAGE = "org.onosproject.netconf.provider.device";
79 + private static final String UNKNOWN = "unknown";
76 80
77 private DeviceProviderService providerService; 81 private DeviceProviderService providerService;
78 private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener(); 82 private NetconfDeviceListener innerNodeListener = new InnerNetconfDeviceListener();
79 - protected static final String ISNOTNULL = "NetconfDeviceInfo is not null";
80 - private static final String UNKNOWN = "unknown";
81 83
82 private final ConfigFactory factory = 84 private final ConfigFactory factory =
83 new ConfigFactory<ApplicationId, NetconfProviderConfig>(APP_SUBJECT_FACTORY, 85 new ConfigFactory<ApplicationId, NetconfProviderConfig>(APP_SUBJECT_FACTORY,
...@@ -96,10 +98,10 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -96,10 +98,10 @@ public class NetconfDeviceProvider extends AbstractProvider
96 @Activate 98 @Activate
97 public void activate() { 99 public void activate() {
98 providerService = providerRegistry.register(this); 100 providerService = providerRegistry.register(this);
101 + appId = coreService.registerApplication(APP_NAME);
99 cfgService.registerConfigFactory(factory); 102 cfgService.registerConfigFactory(factory);
100 cfgService.addListener(cfgLister); 103 cfgService.addListener(cfgLister);
101 controller.addDeviceListener(innerNodeListener); 104 controller.addDeviceListener(innerNodeListener);
102 - appId = coreService.registerApplication("org.onosproject.netconf");
103 connectDevices(); 105 connectDevices();
104 log.info("Started"); 106 log.info("Started");
105 } 107 }
...@@ -110,11 +112,12 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -110,11 +112,12 @@ public class NetconfDeviceProvider extends AbstractProvider
110 providerRegistry.unregister(this); 112 providerRegistry.unregister(this);
111 providerService = null; 113 providerService = null;
112 cfgService.unregisterConfigFactory(factory); 114 cfgService.unregisterConfigFactory(factory);
115 + controller.removeDeviceListener(innerNodeListener);
113 log.info("Stopped"); 116 log.info("Stopped");
114 } 117 }
115 118
116 public NetconfDeviceProvider() { 119 public NetconfDeviceProvider() {
117 - super(new ProviderId("netconf", "org.onosproject.netconf.provider.device")); 120 + super(new ProviderId(SCHEME_NAME, DEVICE_PROVIDER_PACKAGE));
118 } 121 }
119 122
120 @Override 123 @Override
...@@ -142,15 +145,18 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -142,15 +145,18 @@ public class NetconfDeviceProvider extends AbstractProvider
142 145
143 private class InnerNetconfDeviceListener implements NetconfDeviceListener { 146 private class InnerNetconfDeviceListener implements NetconfDeviceListener {
144 147
148 + private static final String IPADDRESS = "ipaddress";
149 + protected static final String ISNULL = "NetconfDeviceInfo is null";
150 +
145 @Override 151 @Override
146 public void deviceAdded(NetconfDeviceInfo nodeId) { 152 public void deviceAdded(NetconfDeviceInfo nodeId) {
147 - Preconditions.checkNotNull(nodeId, ISNOTNULL); 153 + Preconditions.checkNotNull(nodeId, ISNULL);
148 DeviceId deviceId = nodeId.getDeviceId(); 154 DeviceId deviceId = nodeId.getDeviceId();
149 //Netconf configuration object 155 //Netconf configuration object
150 ChassisId cid = new ChassisId(); 156 ChassisId cid = new ChassisId();
151 String ipAddress = nodeId.ip().toString(); 157 String ipAddress = nodeId.ip().toString();
152 SparseAnnotations annotations = DefaultAnnotations.builder() 158 SparseAnnotations annotations = DefaultAnnotations.builder()
153 - .set("ipaddress", ipAddress).build(); 159 + .set(IPADDRESS, ipAddress).build();
154 DeviceDescription deviceDescription = new DefaultDeviceDescription( 160 DeviceDescription deviceDescription = new DefaultDeviceDescription(
155 deviceId.uri(), 161 deviceId.uri(),
156 Device.Type.SWITCH, 162 Device.Type.SWITCH,
...@@ -164,7 +170,7 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -164,7 +170,7 @@ public class NetconfDeviceProvider extends AbstractProvider
164 170
165 @Override 171 @Override
166 public void deviceRemoved(NetconfDeviceInfo nodeId) { 172 public void deviceRemoved(NetconfDeviceInfo nodeId) {
167 - Preconditions.checkNotNull(nodeId, ISNOTNULL); 173 + Preconditions.checkNotNull(nodeId, ISNULL);
168 DeviceId deviceId = nodeId.getDeviceId(); 174 DeviceId deviceId = nodeId.getDeviceId();
169 providerService.deviceDisconnected(deviceId); 175 providerService.deviceDisconnected(deviceId);
170 176
...@@ -184,7 +190,7 @@ public class NetconfDeviceProvider extends AbstractProvider ...@@ -184,7 +190,7 @@ public class NetconfDeviceProvider extends AbstractProvider
184 addr.ip(), 190 addr.ip(),
185 addr.port())); 191 addr.port()));
186 } catch (IOException e) { 192 } catch (IOException e) {
187 - log.warn("Can't connect to NETCONF " + 193 + log.info("Can't connect to NETCONF " +
188 "device on {}:{}", 194 "device on {}:{}",
189 addr.ip(), 195 addr.ip(),
190 addr.port()); 196 addr.port());
......