index.js
4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.StreamReader = exports.EndOfStreamError = void 0;
const EndOfFileStream_1 = require("./EndOfFileStream");
var EndOfFileStream_2 = require("./EndOfFileStream");
Object.defineProperty(exports, "EndOfStreamError", { enumerable: true, get: function () { return EndOfFileStream_2.EndOfStreamError; } });
class Deferred {
constructor() {
this.promise = new Promise((resolve, reject) => {
this.reject = reject;
this.resolve = resolve;
});
}
}
const maxStreamReadSize = 1 * 1024 * 1024; // Maximum request length on read-stream operation
class StreamReader {
constructor(s) {
this.s = s;
this.endOfStream = false;
/**
* Store peeked data
* @type {Array}
*/
this.peekQueue = [];
if (!s.read || !s.once) {
throw new Error('Expected an instance of stream.Readable');
}
this.s.once('end', () => this.reject(new EndOfFileStream_1.EndOfStreamError()));
this.s.once('error', err => this.reject(err));
this.s.once('close', () => this.reject(new Error('Stream closed')));
}
/**
* Read ahead (peek) from stream. Subsequent read or peeks will return the same data
* @param buffer - Buffer to store data read from stream in
* @param offset - Offset buffer
* @param length - Number of bytes to read
* @returns Number of bytes peeked
*/
async peek(buffer, offset, length) {
const bytesRead = await this.read(buffer, offset, length);
this.peekQueue.push(buffer.slice(offset, offset + bytesRead)); // Put read data back to peek buffer
return bytesRead;
}
/**
* Read chunk from stream
* @param buffer - Target buffer to store data read from stream in
* @param offset - Offset of target buffer
* @param length - Number of bytes to read
* @returns Number of bytes read
*/
async read(buffer, offset, length) {
if (length === 0) {
return 0;
}
if (this.peekQueue.length === 0 && this.endOfStream) {
throw new EndOfFileStream_1.EndOfStreamError();
}
let remaining = length;
let bytesRead = 0;
// consume peeked data first
while (this.peekQueue.length > 0 && remaining > 0) {
const peekData = this.peekQueue.pop(); // Front of queue
const lenCopy = Math.min(peekData.length, remaining);
peekData.copy(buffer, offset + bytesRead, 0, lenCopy);
bytesRead += lenCopy;
remaining -= lenCopy;
if (lenCopy < peekData.length) {
// remainder back to queue
this.peekQueue.push(peekData.slice(lenCopy));
}
}
// continue reading from stream if required
while (remaining > 0 && !this.endOfStream) {
const reqLen = Math.min(remaining, maxStreamReadSize);
const chunkLen = await this._read(buffer, offset + bytesRead, reqLen);
bytesRead += chunkLen;
if (chunkLen < reqLen)
break;
remaining -= chunkLen;
}
return bytesRead;
}
/**
* Read chunk from stream
* @param buffer Buffer to store data read from stream in
* @param offset Offset buffer
* @param length Number of bytes to read
* @returns Number of bytes read
*/
async _read(buffer, offset, length) {
if (this.request)
throw new Error('Concurrent read operation?');
const readBuffer = this.s.read(length);
if (readBuffer) {
readBuffer.copy(buffer, offset);
return readBuffer.length;
}
else {
this.request = {
buffer,
offset,
length,
deferred: new Deferred()
};
this.s.once('readable', () => {
this.tryRead();
});
return this.request.deferred.promise.then(n => {
this.request = null;
return n;
}, err => {
this.request = null;
throw err;
});
}
}
tryRead() {
const readBuffer = this.s.read(this.request.length);
if (readBuffer) {
readBuffer.copy(this.request.buffer, this.request.offset);
this.request.deferred.resolve(readBuffer.length);
}
else {
this.s.once('readable', () => {
this.tryRead();
});
}
}
reject(err) {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(err);
this.request = null;
}
}
}
exports.StreamReader = StreamReader;