readable_streambuffer.js 3.03 KB
var stream = require("stream"),
	constants = require("./constants"),
	util = require("util");

var ReadableStreamBuffer = module.exports = function(opts) {
	var that = this;

	stream.Stream.call(this);

	opts = opts || {};
	var frequency = opts.hasOwnProperty("frequency") ? opts.frequency : constants.DEFAULT_FREQUENCY;
	var chunkSize = opts.chunkSize || constants.DEFAULT_CHUNK_SIZE;
	var initialSize = opts.initialSize || constants.DEFAULT_INITIAL_SIZE;
	var incrementAmount = opts.incrementAmount || constants.DEFAULT_INCREMENT_AMOUNT;

	var size = 0;
	var buffer = new Buffer(initialSize);
	var encoding = null;

	this.readable = true;
	this.writable = false;

	var sendData = function() {
		var amount = Math.min(chunkSize, size);

		if (amount > 0) {
			var chunk = null;
			if(encoding) {
				chunk = buffer.toString(encoding, 0, amount);
			}
			else {
				chunk = new Buffer(amount);
				buffer.copy(chunk, 0, 0, amount);
			}

			that.emit("data", chunk);

			if(amount < buffer.length)
				buffer.copy(buffer, 0, amount, size);
			size -= amount;
		}

		if(size === 0 && !that.readable) {
			that.emit("end");
			that.emit("close");
			if (sendData && sendData.interval) {
				clearInterval(sendData.interval);
				sendData.interval = null;
			}
		}
	};

	this.size = function() {
		return size;
	};

	this.maxSize = function() {
		return buffer.length;
	};

	var increaseBufferIfNecessary = function(incomingDataSize) {
		if((buffer.length - size) < incomingDataSize) {
			var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount);

			var newBuffer = new Buffer(buffer.length + (incrementAmount * factor));
			buffer.copy(newBuffer, 0, 0, size);
			buffer = newBuffer;
		}
	};

	this.put = function(data, encoding) {
		if(!that.readable) return;

		var wasEmpty = size === 0;
		if(Buffer.isBuffer(data)) {
			increaseBufferIfNecessary(data.length);
			data.copy(buffer, size, 0);
			size += data.length;
		}
		else {
			data = data + "";
			var dataSizeInBytes = Buffer.byteLength(data);
			increaseBufferIfNecessary(dataSizeInBytes);
			buffer.write(data, size, encoding || "utf8");
			size += dataSizeInBytes;
		}

		if (wasEmpty && size > 0) {
			this.emit('readable')
		}

		if (!this.isPaused && !frequency) {
			while (size > 0) {
				sendData();
			}
		}
	};

	this.pause = function() {
		this.isPaused = true;
		if(sendData && sendData.interval) {
			clearInterval(sendData.interval);
			delete sendData.interval;
		}
	};

	this.resume = function() {
		this.isPaused = false;
		if(sendData && !sendData.interval && frequency > 0) {
			sendData.interval = setInterval(sendData, frequency);
		}
	};

	this.destroy = function() {
		that.emit("end");
		if(sendData.interval) clearInterval(sendData.interval);
		sendData = null;
		that.readable = false;
		that.emit("close");
	};

	this.destroySoon = function() {
		that.readable = false;
		if (!sendData.interval) {
			that.emit("end");
			that.emit("close");
		}
	};

	this.setEncoding = function(_encoding) {
		encoding = _encoding;
	};

	this.resume();
};
util.inherits(ReadableStreamBuffer, stream.Stream);