querystream.js 8.08 KB
/* eslint no-empty: 1 */

/*!
 * Module dependencies.
 */

var Stream = require('stream').Stream;
var utils = require('./utils');
var helpers = require('./queryhelpers');
var K = function(k) {
  return k;
};

/**
 * Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries.
 *
 *     var stream = Model.find().stream();
 *
 *     stream.on('data', function (doc) {
 *       // do something with the mongoose document
 *     }).on('error', function (err) {
 *       // handle the error
 *     }).on('close', function () {
 *       // the stream is closed
 *     });
 *
 *
 * The stream interface allows us to simply "plug-in" to other _Node.js 0.8_ style write streams.
 *
 *     Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
 *
 * ####Valid options
 *
 *   - `transform`: optional function which accepts a mongoose document. The return value of the function will be emitted on `data`.
 *
 * ####Example
 *
 *     // JSON.stringify all documents before emitting
 *     var stream = Thing.find().stream({ transform: JSON.stringify });
 *     stream.pipe(writeStream);
 *
 * _NOTE: plugging into an HTTP response will *not* work out of the box. Those streams expect only strings or buffers to be emitted, so first formatting our documents as strings/buffers is necessary._
 *
 * _NOTE: these streams are Node.js 0.8 style read streams which differ from Node.js 0.10 style. Node.js 0.10 streams are not well tested yet and are not guaranteed to work._
 *
 * @param {Query} query
 * @param {Object} [options]
 * @inherits NodeJS Stream http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream
 * @event `data`: emits a single Mongoose document
 * @event `error`: emits when an error occurs during streaming. This will emit _before_ the `close` event.
 * @event `close`: emits when the stream reaches the end of the cursor or an error occurs, or the stream is manually `destroy`ed. After this event, no more events are emitted.
 * @api public
 */

function QueryStream(query, options) {
  Stream.call(this);

  this.query = query;
  this.readable = true;
  this.paused = false;
  this._cursor = null;
  this._destroyed = null;
  this._fields = null;
  this._buffer = null;
  this._inline = T_INIT;
  this._running = false;
  this._transform = options && typeof options.transform === 'function'
      ? options.transform
      : K;

  // give time to hook up events
  var _this = this;
  process.nextTick(function() {
    _this._init();
  });
}

/*!
 * Inherit from Stream
 */

QueryStream.prototype.__proto__ = Stream.prototype;

/**
 * Flag stating whether or not this stream is readable.
 *
 * @property readable
 * @api public
 */

QueryStream.prototype.readable;

/**
 * Flag stating whether or not this stream is paused.
 *
 * @property paused
 * @api public
 */

QueryStream.prototype.paused;

// trampoline flags
var T_INIT = 0;
var T_IDLE = 1;
var T_CONT = 2;

/**
 * Initializes the query.
 *
 * @api private
 */

QueryStream.prototype._init = function() {
  if (this._destroyed) {
    return;
  }

  var query = this.query,
      model = query.model,
      options = query._optionsForExec(model),
      _this = this;

  try {
    query.cast(model);
  } catch (err) {
    return _this.destroy(err);
  }

  _this._fields = utils.clone(query._fields);
  options.fields = query._castFields(_this._fields);

  model.collection.find(query._conditions, options, function(err, cursor) {
    if (err) {
      return _this.destroy(err);
    }
    _this._cursor = cursor;
    _this._next();
  });
};

/**
 * Trampoline for pulling the next doc from cursor.
 *
 * @see QueryStream#__next #querystream_QueryStream-__next
 * @api private
 */

QueryStream.prototype._next = function _next() {
  if (this.paused || this._destroyed) {
    this._running = false;
    return this._running;
  }

  this._running = true;

  if (this._buffer && this._buffer.length) {
    var arg;
    while (!this.paused && !this._destroyed && (arg = this._buffer.shift())) { // eslint-disable-line no-cond-assign
      this._onNextObject.apply(this, arg);
    }
  }

  // avoid stack overflows with large result sets.
  // trampoline instead of recursion.
  while (this.__next()) {
  }
};

/**
 * Pulls the next doc from the cursor.
 *
 * @see QueryStream#_next #querystream_QueryStream-_next
 * @api private
 */

QueryStream.prototype.__next = function() {
  if (this.paused || this._destroyed) {
    this._running = false;
    return this._running;
  }

  var _this = this;
  _this._inline = T_INIT;

  _this._cursor.nextObject(function cursorcb(err, doc) {
    _this._onNextObject(err, doc);
  });

  // if onNextObject() was already called in this tick
  // return ourselves to the trampoline.
  if (T_CONT === this._inline) {
    return true;
  }
  // onNextObject() hasn't fired yet. tell onNextObject
  // that its ok to call _next b/c we are not within
  // the trampoline anymore.
  this._inline = T_IDLE;
};

/**
 * Transforms raw `doc`s returned from the cursor into a model instance.
 *
 * @param {Error|null} err
 * @param {Object} doc
 * @api private
 */

QueryStream.prototype._onNextObject = function _onNextObject(err, doc) {
  if (this._destroyed) {
    return;
  }

  if (this.paused) {
    this._buffer || (this._buffer = []);
    this._buffer.push([err, doc]);
    this._running = false;
    return this._running;
  }

  if (err) {
    return this.destroy(err);
  }

  // when doc is null we hit the end of the cursor
  if (!doc) {
    this.emit('end');
    return this.destroy();
  }

  var opts = this.query._mongooseOptions;

  if (!opts.populate) {
    return opts.lean === true ?
        emit(this, doc) :
        createAndEmit(this, null, doc);
  }

  var _this = this;
  var pop = helpers.preparePopulationOptionsMQ(_this.query, _this.query._mongooseOptions);

  // Hack to work around gh-3108
  pop.forEach(function(option) {
    delete option.model;
  });

  pop.__noPromise = true;
  _this.query.model.populate(doc, pop, function(err, doc) {
    if (err) {
      return _this.destroy(err);
    }
    return opts.lean === true ?
        emit(_this, doc) :
        createAndEmit(_this, pop, doc);
  });
};

function createAndEmit(self, populatedIds, doc) {
  var instance = helpers.createModel(self.query.model, doc, self._fields);
  var opts = populatedIds ?
    {populated: populatedIds} :
    undefined;

  instance.init(doc, opts, function(err) {
    if (err) {
      return self.destroy(err);
    }
    emit(self, instance);
  });
}

/*!
 * Emit a data event and manage the trampoline state
 */

function emit(self, doc) {
  self.emit('data', self._transform(doc));

  // trampoline management
  if (T_IDLE === self._inline) {
    // no longer in trampoline. restart it.
    self._next();
  } else {
    // in a trampoline. tell __next that its
    // ok to continue jumping.
    self._inline = T_CONT;
  }
}

/**
 * Pauses this stream.
 *
 * @api public
 */

QueryStream.prototype.pause = function() {
  this.paused = true;
};

/**
 * Resumes this stream.
 *
 * @api public
 */

QueryStream.prototype.resume = function() {
  this.paused = false;

  if (!this._cursor) {
    // cannot start if not initialized
    return;
  }

  // are we within the trampoline?
  if (T_INIT === this._inline) {
    return;
  }

  if (!this._running) {
    // outside QueryStream control, need manual restart
    return this._next();
  }
};

/**
 * Destroys the stream, closing the underlying cursor, which emits the close event. No more events will be emitted after the close event.
 *
 * @param {Error} [err]
 * @api public
 */

QueryStream.prototype.destroy = function(err) {
  if (this._destroyed) {
    return;
  }
  this._destroyed = true;
  this._running = false;
  this.readable = false;

  if (this._cursor) {
    this._cursor.close();
  }

  if (err) {
    this.emit('error', err);
  }

  this.emit('close');
};

/**
 * Pipes this query stream into another stream. This method is inherited from NodeJS Streams.
 *
 * ####Example:
 *
 *     query.stream().pipe(writeStream [, options])
 *
 * @method pipe
 * @memberOf QueryStream
 * @see NodeJS http://nodejs.org/api/stream.html
 * @api public
 */

/*!
 * Module exports
 */

module.exports = exports = QueryStream;