index.js 5.47 KB
var Duplex = require('readable-stream').Duplex;
var PassThrough = require('readable-stream').PassThrough;
var Readable = require('readable-stream').Readable;
var inherits = require('inherits');

var nextTick = typeof setImmediate !== 'undefined'
    ? setImmediate : process.nextTick
;

module.exports = Pipeline;
inherits(Pipeline, Duplex);

module.exports.obj = function (streams, opts) {
    if (!opts && !Array.isArray(streams)) {
        opts = streams;
        streams = [];
    }
    if (!streams) streams = [];
    if (!opts) opts = {};
    opts.objectMode = true;
    return new Pipeline(streams, opts);
};

function Pipeline (streams, opts) {
    if (!(this instanceof Pipeline)) return new Pipeline(streams, opts);
    if (!opts && !Array.isArray(streams)) {
        opts = streams;
        streams = [];
    }
    if (!streams) streams = [];
    if (!opts) opts = {};
    Duplex.call(this, opts);
    
    var self = this;
    this._options = opts;
    this._wrapOptions = { objectMode: opts.objectMode !== false };
    this._streams = [];
    
    this.splice.apply(this, [ 0, 0 ].concat(streams));
    
    this.once('finish', function () {
        self._notEmpty();
        self._streams[0].end();
    });
}

Pipeline.prototype._read = function () {
    var self = this;
    this._notEmpty();
    
    var r = this._streams[this._streams.length-1];
    var buf, reads = 0;
    while ((buf = r.read()) !== null) {
        Duplex.prototype.push.call(this, buf);
        reads ++;
    }
    if (reads === 0) {
        var onreadable = function () {
            r.removeListener('readable', onreadable);
            self.removeListener('_mutate', onreadable);
            self._read()
        };
        r.once('readable', onreadable);
        self.once('_mutate', onreadable);
    }
};

Pipeline.prototype._write = function (buf, enc, next) {
    this._notEmpty();
    this._streams[0]._write(buf, enc, next);
};

Pipeline.prototype._notEmpty = function () {
    var self = this;
    if (this._streams.length > 0) return;
    var stream = new PassThrough(this._options);
    stream.once('end', function () {
        var ix = self._streams.indexOf(stream);
        if (ix >= 0 && ix === self._streams.length - 1) {
            Duplex.prototype.push.call(self, null);
        }
    });
    this._streams.push(stream);
    this.length = this._streams.length;
};

Pipeline.prototype.push = function (stream) {
    var args = [ this._streams.length, 0 ].concat([].slice.call(arguments));
    this.splice.apply(this, args);
    return this._streams.length;
};

Pipeline.prototype.pop = function () {
    return this.splice(this._streams.length-1,1)[0];
};

Pipeline.prototype.shift = function () {
    return this.splice(0,1)[0];
};

Pipeline.prototype.unshift = function () {
    this.splice.apply(this, [0,0].concat([].slice.call(arguments)));
    return this._streams.length;
};

Pipeline.prototype.splice = function (start, removeLen) {
    var self = this;
    var len = this._streams.length;
    start = start < 0 ? len - start : start;
    if (removeLen === undefined) removeLen = len - start;
    removeLen = Math.max(0, Math.min(len - start, removeLen));
    
    for (var i = start; i < start + removeLen; i++) {
        if (self._streams[i-1]) {
            self._streams[i-1].unpipe(self._streams[i]);
        }
    }
    if (self._streams[i-1] && self._streams[i]) {
        self._streams[i-1].unpipe(self._streams[i]);
    }
    var end = i;
    
    var reps = [], args = arguments;
    for (var j = 2; j < args.length; j++) (function (stream) {
        if (Array.isArray(stream)) {
            stream = new Pipeline(stream, self._options);
        }
        stream.on('error', function (err) {
            err.stream = this;
            self.emit('error', err);
        });
        stream = self._wrapStream(stream);
        stream.once('end', function () {
            var ix = self._streams.indexOf(stream);
            if (ix >= 0 && ix === self._streams.length - 1) {
                Duplex.prototype.push.call(self, null);
            }
        });
        reps.push(stream);
    })(arguments[j]);
    
    for (var i = 0; i < reps.length - 1; i++) {
        reps[i].pipe(reps[i+1]);
    }
    
    if (reps.length && self._streams[end]) {
        reps[reps.length-1].pipe(self._streams[end]);
    }
    if (reps[0] && self._streams[start-1]) {
        self._streams[start-1].pipe(reps[0]);
    }
    
    var sargs = [start,removeLen].concat(reps);
    var removed = self._streams.splice.apply(self._streams, sargs);
    
    for (var i = 0; i < reps.length; i++) {
        reps[i].read(0);
    }
    
    this.emit('_mutate');
    this.length = this._streams.length;
    return removed;
};

Pipeline.prototype.get = function () {
    if (arguments.length === 0) return undefined;
    
    var base = this;
    for (var i = 0; i < arguments.length; i++) {
        var index = arguments[i];
        if (index < 0) {
            base = base._streams[base._streams.length + index];
        }
        else {
            base = base._streams[index];
        }
        if (!base) return undefined;
    }
    return base;
};

Pipeline.prototype.indexOf = function (stream) {
    return this._streams.indexOf(stream);
};

Pipeline.prototype._wrapStream = function (stream) {
    if (typeof stream.read === 'function') return stream;
    var w = new Readable(this._wrapOptions).wrap(stream);
    w._write = function (buf, enc, next) {
        if (stream.write(buf) === false) {
            stream.once('drain', next);
        }
        else nextTick(next);
    };
    return w;
};