Andreas Papazois
Committed by Gerrit Code Review

[ONOS-3918] Handling of NETCONF <rpc-error> and no message-id

Change-Id: I8b9396a727fb54b5b84d02f258c14cfccad5bb99
......@@ -18,6 +18,8 @@ package org.onosproject.netconf;
import org.onosproject.event.AbstractEvent;
import java.util.Optional;
/**
* Describes network configuration event.
*/
......@@ -25,7 +27,7 @@ public final class NetconfDeviceOutputEvent extends
AbstractEvent<NetconfDeviceOutputEvent.Type, Object> {
private final String messagePayload;
private final int messageID;
private final Optional<Integer> messageID;
private final NetconfDeviceInfo deviceInfo;
/**
......@@ -64,7 +66,8 @@ public final class NetconfDeviceOutputEvent extends
* @param msgID id of the message related to the event
* @param netconfDeviceInfo device of event
*/
public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
public NetconfDeviceOutputEvent(Type type, Object subject, String payload,
Optional<Integer> msgID,
NetconfDeviceInfo netconfDeviceInfo) {
super(type, subject);
messagePayload = payload;
......@@ -83,8 +86,10 @@ public final class NetconfDeviceOutputEvent extends
* @param msgID id of the message related to the event
* @param time occurrence time
*/
public NetconfDeviceOutputEvent(Type type, Object subject, String payload, int msgID,
NetconfDeviceInfo netconfDeviceInfo, long time) {
public NetconfDeviceOutputEvent(Type type, Object subject, String payload,
Optional<Integer> msgID,
NetconfDeviceInfo netconfDeviceInfo,
long time) {
super(type, subject, time);
messagePayload = payload;
deviceInfo = netconfDeviceInfo;
......@@ -111,7 +116,7 @@ public final class NetconfDeviceOutputEvent extends
* Reply messageId.
* @return messageId
*/
public Integer getMessageID() {
public Optional<Integer> getMessageID() {
return messageID;
}
}
......
......@@ -28,10 +28,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -71,12 +73,14 @@ public class NetconfSessionImpl implements NetconfSession {
private String serverCapabilities;
private NetconfStreamHandler t;
private Map<Integer, CompletableFuture<String>> replies;
private List<String> errorReplies;
public NetconfSessionImpl(NetconfDeviceInfo deviceInfo) throws NetconfException {
this.deviceInfo = deviceInfo;
connectionActive = false;
replies = new HashMap<>();
errorReplies = new ArrayList<>();
startConnection();
}
......@@ -194,18 +198,13 @@ public class NetconfSessionImpl implements NetconfSession {
request = formatRequestMessageId(request);
request = formatXmlHeader(request);
CompletableFuture<String> futureReply = request(request);
messageIdInteger.incrementAndGet();
String rp;
try {
rp = futureReply.get(FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
//replies.remove(messageIdInteger.get());
throw new NetconfException("Can't get the reply for request" + request, e);
throw new NetconfException("No matching reply for request " + request, e);
}
// String rp = Tools.futureGetOrElse(futureReply, FUTURE_REPLY_TIMEOUT, TimeUnit.MILLISECONDS,
// "Error in completing the request with message-id " +
// messageIdInteger.get() +
// ": future timed out.");
messageIdInteger.incrementAndGet();
log.debug("Result {} from request {} to device {}", rp, request, deviceInfo);
return rp;
}
......@@ -442,8 +441,16 @@ public class NetconfSessionImpl implements NetconfSession {
public class NetconfSessionDelegateImpl implements NetconfSessionDelegate {
@Override
public void notify(NetconfDeviceOutputEvent event) {
CompletableFuture<String> completedReply = replies.get(event.getMessageID());
public void notify(NetconfDeviceOutputEvent event) {
Optional<Integer> messageId = event.getMessageID();
if (!messageId.isPresent()) {
errorReplies.add(event.getMessagePayload());
log.error("Device " + event.getDeviceInfo() +
" sent error reply " + event.getMessagePayload());
return;
}
CompletableFuture<String> completedReply =
replies.get(messageId.get());
completedReply.complete(event.getMessagePayload());
}
}
......
......@@ -32,6 +32,7 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
......@@ -48,6 +49,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
private static final String RPC_REPLY = "rpc-reply";
private static final String RPC_ERROR = "rpc-error";
private static final String NOTIFICATION_LABEL = "<notification>";
private static final String MESSAGE_ID = "message-id=";
private PrintWriter outputStream;
private final InputStream err;
......@@ -163,7 +165,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
log.debug("char {} " + bufferReader.read());
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
null, null, -1, netconfDeviceInfo);
null, null, Optional.of(-1), netconfDeviceInfo);
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
}
......@@ -175,7 +177,7 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
if (deviceReply.equals(END_PATTERN)) {
NetconfDeviceOutputEvent event = new NetconfDeviceOutputEvent(
NetconfDeviceOutputEvent.Type.DEVICE_UNREGISTERED,
null, null, -1, netconfDeviceInfo);
null, null, Optional.of(-1), netconfDeviceInfo);
netconfDeviceEventListeners.forEach(
listener -> listener.event(event));
} else {
......@@ -211,18 +213,20 @@ public class NetconfStreamThread extends Thread implements NetconfStreamHandler
}
}
private static int getMsgId(String reply) {
if (!reply.contains(HELLO)) {
String[] outer = reply.split("message-id=");
Preconditions.checkArgument(outer.length != 1,
"Error in retrieving the message id");
String messageID = outer[1].substring(0, 3).replace("\"", "");
Preconditions.checkNotNull(Integer.parseInt(messageID),
"Error in retrieving the message id");
return Integer.parseInt(messageID);
} else {
return 0;
private static Optional<Integer> getMsgId(String reply) {
if (reply.contains(HELLO)) {
return Optional.of(0);
}
if (reply.contains(RPC_ERROR) && !reply.contains(MESSAGE_ID)) {
return Optional.empty();
}
String[] outer = reply.split(MESSAGE_ID);
Preconditions.checkArgument(outer.length != 1,
"Error in retrieving the message id");
String messageID = outer[1].substring(0, 3).replace("\"", "");
Preconditions.checkNotNull(Integer.parseInt(messageID),
"Error in retrieving the message id");
return Optional.of(Integer.parseInt(messageID));
}
public void addDeviceEventListener(NetconfDeviceOutputEventListener listener) {
......