Andrea Campanella
Committed by Gerrit Code Review

ONOS-3839 Fixing errors in sending requests and hanging on future.join

Change-Id: I6da5bf1ff728efeb0d531cf7f04f6bf49f11a0a9
......@@ -52,10 +52,11 @@ public class NetconfControllerConfig extends AbstractHandlerBehaviour
Preconditions.checkNotNull(controller, "Netconf controller is null");
List<ControllerInfo> controllers = new ArrayList<>();
try {
String reply = controller.getDevicesMap().get(ofDeviceId).getSession().
getConfig("running");
log.debug("Reply XML {}", reply);
controllers.addAll(XmlConfigParser.parseStreamControllers(XmlConfigParser.
loadXml(new ByteArrayInputStream(controller.
getDevicesMap().get(ofDeviceId).getSession().
getConfig("running").getBytes(StandardCharsets.UTF_8)))));
loadXml(new ByteArrayInputStream(reply.getBytes(StandardCharsets.UTF_8)))));
} catch (IOException e) {
log.error("Cannot comunicate to device {} ", ofDeviceId);
}
......@@ -70,16 +71,15 @@ public class NetconfControllerConfig extends AbstractHandlerBehaviour
Preconditions.checkNotNull(controller, "Netconf controller is null");
try {
NetconfDevice device = controller.getNetconfDevice(deviceId);
log.warn("provider map {}", controller.getDevicesMap());
String config = null;
try {
String reply = device.getSession().getConfig("running");
log.info("reply XML {}", reply);
config = XmlConfigParser.createControllersConfig(
XmlConfigParser.loadXml(getClass().getResourceAsStream("controllers.xml")),
XmlConfigParser.loadXml(
new ByteArrayInputStream(device.getSession()
.getConfig("running")
.getBytes(
StandardCharsets.UTF_8))),
new ByteArrayInputStream(reply.getBytes(StandardCharsets.UTF_8))),
"running", "merge", "create", controllers
);
} catch (IOException e) {
......
......@@ -91,14 +91,26 @@ public final class NetconfDeviceOutputEvent extends
this.messageID = msgID;
}
/**
* return the message payload of the reply form the device.
* @return reply
*/
public String getMessagePayload() {
return messagePayload;
}
/**
* Event-related device information.
* @return information about the device
*/
public NetconfDeviceInfo getDeviceInfo() {
return deviceInfo;
}
/**
* Reply messageId.
* @return messageId
*/
public Integer getMessageID() {
return messageID;
}
......
......@@ -28,12 +28,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -48,17 +50,22 @@ public class NetconfSessionImpl implements NetconfSession {
private static final int CONNECTION_TIMEOUT = 0;
private static final String ENDPATTERN = "]]>]]>";
private static final AtomicInteger MESSAGE_ID_INTEGER = new AtomicInteger(0);
private static final String MESSAGE_ID_STRING = "message-id";
private static final String HELLO = "hello";
private static final String NEW_LINE = "\n";
private static final int FUTURE_REPLY_TIMEOUT = 5000;
private static final String ERROR = "ERROR ";
private static final String END_OF_RPC_OPEN_TAG = "\">";
private static final String EQUAL = "=";
private static final String NUMBER_BETWEEN_QUOTES_MATCHER = "\"+([0-9]+)+\"";
private static final String XML_HEADER =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
private final AtomicInteger messageIdInteger = new AtomicInteger(0);
private Connection netconfConnection;
private NetconfDeviceInfo deviceInfo;
private Session sshSession;
private boolean connectionActive;
private PrintWriter out = null;
private List<String> deviceCapabilities =
Collections.singletonList("urn:ietf:params:netconf:base:1.0");
private String serverCapabilities;
......@@ -116,7 +123,6 @@ public class NetconfSessionImpl implements NetconfSession {
try {
sshSession = netconfConnection.openSession();
sshSession.startSubSystem("netconf");
out = new PrintWriter(sshSession.getStdin());
t = new NetconfStreamThread(sshSession.getStdout(), sshSession.getStdin(),
sshSession.getStderr(), deviceInfo,
new NetconfSessionDelegateImpl());
......@@ -130,17 +136,20 @@ public class NetconfSessionImpl implements NetconfSession {
}
}
private void sendHello() throws IOException {
private void sendHello() throws NetconfException {
serverCapabilities = sendRequest(createHelloString());
}
private String createHelloString() {
StringBuilder hellobuffer = new StringBuilder();
hellobuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
hellobuffer.append(XML_HEADER);
hellobuffer.append("\n");
hellobuffer.append("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
hellobuffer.append(" <capabilities>\n");
deviceCapabilities.forEach(
cap -> hellobuffer.append(" <capability>" + cap + "</capability>\n"));
cap -> hellobuffer.append(" <capability>")
.append(cap)
.append("</capability>\n"));
hellobuffer.append(" </capabilities>\n");
hellobuffer.append("</hello>\n");
hellobuffer.append(ENDPATTERN);
......@@ -166,31 +175,62 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public String requestSync(String request) throws NetconfException {
String reply = sendRequest(request + NEW_LINE + ENDPATTERN);
return checkReply(reply) ? reply : "ERROR " + reply;
if (!request.contains(ENDPATTERN)) {
request = request + NEW_LINE + ENDPATTERN;
}
String reply = sendRequest(request);
return checkReply(reply) ? reply : ERROR + reply;
}
@Override
public CompletableFuture<String> request(String request) {
CompletableFuture<String> ftrep = t.sendMessage(request);
replies.put(MESSAGE_ID_INTEGER.get(), ftrep);
replies.put(messageIdInteger.get(), ftrep);
return ftrep;
}
private String sendRequest(String request) throws NetconfException {
checkAndRestablishSession();
//FIXME find out a better way to enforce the presence of message-id
if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
request = request.replaceFirst("\">", "\" message-id=\""
+ MESSAGE_ID_INTEGER.get() + "\"" + ">");
}
request = formatRequestMessageId(request);
request = formatXmlHeader(request);
CompletableFuture<String> futureReply = request(request);
MESSAGE_ID_INTEGER.incrementAndGet();
String rp = futureReply.join();
log.debug("Reply from device {}", rp);
String rp;
try {
rp = futureReply.get(FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//replies.remove(messageIdInteger.get());
throw new NetconfException("Can't get the reply for request" + request, e);
}
// String rp = Tools.futureGetOrElse(futureReply, FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS,
// "Error in completing the request with message-id " +
// messageIdInteger.get() +
// ": future timed out.");
messageIdInteger.incrementAndGet();
log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
return rp;
}
private String formatRequestMessageId(String request) {
if (request.contains(MESSAGE_ID_STRING)) {
//FIXME if application provieds his own counting of messages this fails that count
request = request.replaceFirst(MESSAGE_ID_STRING + EQUAL + NUMBER_BETWEEN_QUOTES_MATCHER,
MESSAGE_ID_STRING + EQUAL + "\"" + messageIdInteger.get() + "\"");
} else if (!request.contains(MESSAGE_ID_STRING) && !request.contains(HELLO)) {
//FIXME find out a better way to enforce the presence of message-id
request = request.replaceFirst(END_OF_RPC_OPEN_TAG, "\" " + MESSAGE_ID_STRING + EQUAL + "\""
+ messageIdInteger.get() + "\"" + ">");
}
return request;
}
private String formatXmlHeader(String request) {
if (!request.contains(XML_HEADER)) {
//FIXME if application provieds his own XML header of different type there is a clash
request = XML_HEADER + "\n" + request;
}
return request;
}
@Override
public String get(String request) throws NetconfException {
return requestSync(request);
......@@ -203,16 +243,21 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public String getConfig(String targetConfiguration, String configurationSchema) throws NetconfException {
StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\" "
+ "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc ");
rpc.append(MESSAGE_ID_STRING);
rpc.append(EQUAL);
rpc.append("\"");
rpc.append(messageIdInteger.get());
rpc.append("\" ");
rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<get-config>\n");
rpc.append("<source>\n");
rpc.append("<" + targetConfiguration + "/>");
rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</source>");
if (configurationSchema != null) {
rpc.append("<filter type=\"subtree\">\n");
rpc.append(configurationSchema + "\n");
rpc.append(configurationSchema).append("\n");
rpc.append("</filter>\n");
}
rpc.append("</get-config>\n");
......@@ -232,12 +277,17 @@ public class NetconfSessionImpl implements NetconfSession {
public boolean editConfig(String targetConfiguration, String mode, String newConfiguration)
throws NetconfException {
newConfiguration = newConfiguration.trim();
StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
rpc.append("<rpc message-id=\"" + MESSAGE_ID_INTEGER.get() + "\" "
+ "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc ");
rpc.append(MESSAGE_ID_STRING);
rpc.append(EQUAL);
rpc.append("\"");
rpc.append(messageIdInteger.get());
rpc.append("\" ");
rpc.append("xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n");
rpc.append("<edit-config>");
rpc.append("<target>");
rpc.append("<" + targetConfiguration + "/>");
rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</target>");
rpc.append("<default-operation>");
rpc.append(mode);
......@@ -259,15 +309,14 @@ public class NetconfSessionImpl implements NetconfSession {
newConfiguration = "<configuration>" + newConfiguration
+ "</configuration>";
}
StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
"encoding=\"UTF-8\"?>");
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<copy-config>");
rpc.append("<target>");
rpc.append("<" + targetConfiguration + "/>");
rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</target>");
rpc.append("<source>");
rpc.append("<" + newConfiguration + "/>");
rpc.append("<").append(newConfiguration).append("/>");
rpc.append("</source>");
rpc.append("</copy-config>");
rpc.append("</rpc>");
......@@ -282,12 +331,11 @@ public class NetconfSessionImpl implements NetconfSession {
targetConfiguration);
return false;
}
StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
"encoding=\"UTF-8\"?>");
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<delete-config>");
rpc.append("<target>");
rpc.append("<" + targetConfiguration + "/>");
rpc.append("<").append(targetConfiguration).append("/>");
rpc.append("</target>");
rpc.append("</delete-config>");
rpc.append("</rpc>");
......@@ -297,8 +345,7 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public boolean lock() throws NetconfException {
StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
"encoding=\"UTF-8\"?>");
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<lock>");
rpc.append("<target>");
......@@ -312,8 +359,7 @@ public class NetconfSessionImpl implements NetconfSession {
@Override
public boolean unlock() throws NetconfException {
StringBuilder rpc = new StringBuilder("<?xml version=\"1.0\" " +
"encoding=\"UTF-8\"?>");
StringBuilder rpc = new StringBuilder(XML_HEADER);
rpc.append("<rpc>");
rpc.append("<unlock>");
rpc.append("<target>");
......
......@@ -49,17 +49,20 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
private static final String RPC_ERROR = "rpc-error";
private static final String NOTIFICATION_LABEL = "<notification>";
private static PrintWriter outputStream;
private static NetconfDeviceInfo netconfDeviceInfo;
private static NetconfSessionDelegate sessionDelegate;
private static NetconfMessageState state;
private static List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
private PrintWriter outputStream;
private final InputStream err;
private final InputStream in;
private NetconfDeviceInfo netconfDeviceInfo;
private NetconfSessionDelegate sessionDelegate;
private NetconfMessageState state;
private List<NetconfDeviceOutputEventListener> netconfDeviceEventListeners
= Lists.newArrayList();
public NetconfStreamThread(final InputStream in, final OutputStream out,
final InputStream err, NetconfDeviceInfo deviceInfo,
NetconfSessionDelegate delegate) {
super(handler(in, err));
this.in = in;
this.err = err;
outputStream = new PrintWriter(out);
netconfDeviceInfo = deviceInfo;
state = NetconfMessageState.NO_MATCHING_PATTERN;
......@@ -70,6 +73,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
@Override
public CompletableFuture<String> sendMessage(String request) {
log.debug("Sending message {} to device {}", request, netconfDeviceInfo);
outputStream.print(request);
outputStream.flush();
return new CompletableFuture<>();
......@@ -147,9 +151,8 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
abstract NetconfMessageState evaluateChar(char c);
}
private static Runnable handler(final InputStream in, final InputStream err) {
public void run() {
BufferedReader bufferReader = new BufferedReader(new InputStreamReader(in));
return () -> {
try {
boolean socketClosed = false;
StringBuilder deviceReplyBuilder = new StringBuilder();
......@@ -157,6 +160,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
int cInt = bufferReader.read();
if (cInt == -1) {
socketClosed = true;
log.debug("char {} " + bufferReader.read());
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
null, null, -1, netconfDeviceInfo);
......@@ -167,8 +171,15 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
state = state.evaluateChar(c);
deviceReplyBuilder.append(c);
if (state == NetconfMessageState.END_PATTERN) {
String deviceReply = deviceReplyBuilder.toString()
.replace(END_PATTERN, "");
String deviceReply = deviceReplyBuilder.toString();
if (deviceReply.equals(END_PATTERN)) {
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
null, null, -1, netconfDeviceInfo);
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
} else {
deviceReply = deviceReply.replace(END_PATTERN, "");
if (deviceReply.contains(RPC_REPLY) ||
deviceReply.contains(RPC_ERROR) ||
deviceReply.contains(HELLO)) {
......@@ -183,20 +194,21 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
netconfDeviceEventListeners.forEach(
listener -> listener.event(new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_NOTIFICATION,
null, finalDeviceReply, getMsgId(finalDeviceReply), netconfDeviceInfo)));
null, finalDeviceReply, getMsgId(finalDeviceReply),
netconfDeviceInfo)));
} else {
log.info("Error on replay from device {} ", deviceReply);
}
deviceReplyBuilder.setLength(0);
}
}
}
} catch (IOException e) {
log.warn("Error in reading from the session for device " + netconfDeviceInfo, e);
throw new RuntimeException(new NetconfException("Error in reading from the session for device {}" +
netconfDeviceInfo, e));
//TODO should we send a socket closed message to listeners ?
}
};
}
private static int getMsgId(String reply) {
......