download.js
11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.GridFSBucketReadStream = void 0;
const stream_1 = require("stream");
const error_1 = require("../error");
/**
* A readable stream that enables you to read buffers from GridFS.
*
* Do not instantiate this class directly. Use `openDownloadStream()` instead.
* @public
*/
class GridFSBucketReadStream extends stream_1.Readable {
/** @internal
* @param chunks - Handle for chunks collection
* @param files - Handle for files collection
* @param readPreference - The read preference to use
* @param filter - The filter to use to find the file document
*/
constructor(chunks, files, readPreference, filter, options) {
super();
this.s = {
bytesToTrim: 0,
bytesToSkip: 0,
bytesRead: 0,
chunks,
expected: 0,
files,
filter,
init: false,
expectedEnd: 0,
options: {
start: 0,
end: 0,
...options
},
readPreference
};
}
/**
* Reads from the cursor and pushes to the stream.
* Private Impl, do not call directly
* @internal
*/
_read() {
if (this.destroyed)
return;
waitForFile(this, () => doRead(this));
}
/**
* Sets the 0-based offset in bytes to start streaming from. Throws
* an error if this stream has entered flowing mode
* (e.g. if you've already called `on('data')`)
*
* @param start - 0-based offset in bytes to start streaming from
*/
start(start = 0) {
throwIfInitialized(this);
this.s.options.start = start;
return this;
}
/**
* Sets the 0-based offset in bytes to start streaming from. Throws
* an error if this stream has entered flowing mode
* (e.g. if you've already called `on('data')`)
*
* @param end - Offset in bytes to stop reading at
*/
end(end = 0) {
throwIfInitialized(this);
this.s.options.end = end;
return this;
}
/**
* Marks this stream as aborted (will never push another `data` event)
* and kills the underlying cursor. Will emit the 'end' event, and then
* the 'close' event once the cursor is successfully killed.
*
* @param callback - called when the cursor is successfully closed or an error occurred.
*/
abort(callback) {
this.push(null);
this.destroyed = true;
if (this.s.cursor) {
this.s.cursor.close(error => {
this.emit(GridFSBucketReadStream.CLOSE);
callback && callback(error);
});
}
else {
if (!this.s.init) {
// If not initialized, fire close event because we will never
// get a cursor
this.emit(GridFSBucketReadStream.CLOSE);
}
callback && callback();
}
}
}
exports.GridFSBucketReadStream = GridFSBucketReadStream;
/**
* An error occurred
* @event
*/
GridFSBucketReadStream.ERROR = 'error';
/**
* Fires when the stream loaded the file document corresponding to the provided id.
* @event
*/
GridFSBucketReadStream.FILE = 'file';
/**
* Emitted when a chunk of data is available to be consumed.
* @event
*/
GridFSBucketReadStream.DATA = 'data';
/**
* Fired when the stream is exhausted (no more data events).
* @event
*/
GridFSBucketReadStream.END = 'end';
/**
* Fired when the stream is exhausted and the underlying cursor is killed
* @event
*/
GridFSBucketReadStream.CLOSE = 'close';
function throwIfInitialized(stream) {
if (stream.s.init) {
throw new error_1.MongoGridFSStreamError('Options cannot be changed after the stream is initialized');
}
}
function doRead(stream) {
if (stream.destroyed)
return;
if (!stream.s.cursor)
return;
if (!stream.s.file)
return;
stream.s.cursor.next((error, doc) => {
if (stream.destroyed) {
return;
}
if (error) {
stream.emit(GridFSBucketReadStream.ERROR, error);
return;
}
if (!doc) {
stream.push(null);
process.nextTick(() => {
if (!stream.s.cursor)
return;
stream.s.cursor.close(error => {
if (error) {
stream.emit(GridFSBucketReadStream.ERROR, error);
return;
}
stream.emit(GridFSBucketReadStream.CLOSE);
});
});
return;
}
if (!stream.s.file)
return;
const bytesRemaining = stream.s.file.length - stream.s.bytesRead;
const expectedN = stream.s.expected++;
const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining);
if (doc.n > expectedN) {
return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`));
}
if (doc.n < expectedN) {
return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`));
}
let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer;
if (buf.byteLength !== expectedLength) {
if (bytesRemaining <= 0) {
return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected file length ${stream.s.file.length} bytes but already read ${stream.s.bytesRead} bytes`));
}
return stream.emit(GridFSBucketReadStream.ERROR, new error_1.MongoGridFSChunkError(`ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}`));
}
stream.s.bytesRead += buf.byteLength;
if (buf.byteLength === 0) {
return stream.push(null);
}
let sliceStart = null;
let sliceEnd = null;
if (stream.s.bytesToSkip != null) {
sliceStart = stream.s.bytesToSkip;
stream.s.bytesToSkip = 0;
}
const atEndOfStream = expectedN === stream.s.expectedEnd - 1;
const bytesLeftToRead = stream.s.options.end - stream.s.bytesToSkip;
if (atEndOfStream && stream.s.bytesToTrim != null) {
sliceEnd = stream.s.file.chunkSize - stream.s.bytesToTrim;
}
else if (stream.s.options.end && bytesLeftToRead < doc.data.byteLength) {
sliceEnd = bytesLeftToRead;
}
if (sliceStart != null || sliceEnd != null) {
buf = buf.slice(sliceStart || 0, sliceEnd || buf.byteLength);
}
stream.push(buf);
return;
});
}
function init(stream) {
const findOneOptions = {};
if (stream.s.readPreference) {
findOneOptions.readPreference = stream.s.readPreference;
}
if (stream.s.options && stream.s.options.sort) {
findOneOptions.sort = stream.s.options.sort;
}
if (stream.s.options && stream.s.options.skip) {
findOneOptions.skip = stream.s.options.skip;
}
stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => {
if (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
}
if (!doc) {
const identifier = stream.s.filter._id
? stream.s.filter._id.toString()
: stream.s.filter.filename;
const errmsg = `FileNotFound: file ${identifier} was not found`;
// TODO(NODE-3483)
const err = new error_1.MongoRuntimeError(errmsg);
err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor
return stream.emit(GridFSBucketReadStream.ERROR, err);
}
// If document is empty, kill the stream immediately and don't
// execute any reads
if (doc.length <= 0) {
stream.push(null);
return;
}
if (stream.destroyed) {
// If user destroys the stream before we have a cursor, wait
// until the query is done to say we're 'closed' because we can't
// cancel a query.
stream.emit(GridFSBucketReadStream.CLOSE);
return;
}
try {
stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options);
}
catch (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
}
const filter = { files_id: doc._id };
// Currently (MongoDB 3.4.4) skip function does not support the index,
// it needs to retrieve all the documents first and then skip them. (CS-25811)
// As work around we use $gte on the "n" field.
if (stream.s.options && stream.s.options.start != null) {
const skip = Math.floor(stream.s.options.start / doc.chunkSize);
if (skip > 0) {
filter['n'] = { $gte: skip };
}
}
stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });
if (stream.s.readPreference) {
stream.s.cursor.withReadPreference(stream.s.readPreference);
}
stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
stream.s.file = doc;
try {
stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options);
}
catch (error) {
return stream.emit(GridFSBucketReadStream.ERROR, error);
}
stream.emit(GridFSBucketReadStream.FILE, doc);
return;
});
}
function waitForFile(stream, callback) {
if (stream.s.file) {
return callback();
}
if (!stream.s.init) {
init(stream);
stream.s.init = true;
}
stream.once('file', () => {
callback();
});
}
function handleStartOption(stream, doc, options) {
if (options && options.start != null) {
if (options.start > doc.length) {
throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be more than the length of the file (${doc.length})`);
}
if (options.start < 0) {
throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be negative`);
}
if (options.end != null && options.end < options.start) {
throw new error_1.MongoInvalidArgumentError(`Stream start (${options.start}) must not be greater than stream end (${options.end})`);
}
stream.s.bytesRead = Math.floor(options.start / doc.chunkSize) * doc.chunkSize;
stream.s.expected = Math.floor(options.start / doc.chunkSize);
return options.start - stream.s.bytesRead;
}
throw new error_1.MongoInvalidArgumentError('Start option must be defined');
}
function handleEndOption(stream, doc, cursor, options) {
if (options && options.end != null) {
if (options.end > doc.length) {
throw new error_1.MongoInvalidArgumentError(`Stream end (${options.end}) must not be more than the length of the file (${doc.length})`);
}
if (options.start == null || options.start < 0) {
throw new error_1.MongoInvalidArgumentError(`Stream end (${options.end}) must not be negative`);
}
const start = options.start != null ? Math.floor(options.start / doc.chunkSize) : 0;
cursor.limit(Math.ceil(options.end / doc.chunkSize) - start);
stream.s.expectedEnd = Math.ceil(options.end / doc.chunkSize);
return Math.ceil(options.end / doc.chunkSize) * doc.chunkSize - options.end;
}
throw new error_1.MongoInvalidArgumentError('End option must be defined');
}
//# sourceMappingURL=download.js.map