stream-client.js
1.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
var Client = require('./client');
var util = require('./util');
function StreamClient(options) {
options.server = {
stream: options.server,
buffer: [],
full: false
};
Client.apply(this, arguments);
this._callbacks = {};
this._server.stream.on('data', this._onData.bind(this));
this._server.stream.on('drain', this._onDrain.bind(this));
}
StreamClient.prototype = new Client({
server: true
});
StreamClient.prototype.constructor = StreamClient;
StreamClient.prototype._send = function (request) {
var success;
if (this._server.full) {
this._server.buffer.push(request);
} else {
try {
request = JSON.stringify(request);
} catch (e) {
throw 'Could not serialize request to JSON';
}
this._server.full = !this._server.stream.write(request);
}
};
StreamClient.prototype.request = function () {
var request = this._makeRequest.apply(this, arguments);
var callback;
var response;
request.id = this._nextId++;
if (request.callback) {
callback = request.callback;
delete request.callback;
} else if (util.isArray(request.params) &&
util.isFunction(request.params[request.params.length - 1])
) {
callback = request.params.pop();
}
this._send(request);
if (callback && util.isNumber(request.id)) {
this._callbacks[request.id] = callback;
}
};
StreamClient.prototype._onData = function (response) {
response = JSON.parse(response);
if (!util.isUndefined(response.id) && this._callbacks[response.id]) {
this._callbacks[response.id](response.error || null,
response.result || null);
delete this._callbacks[response.id];
}
};
StreamClient.prototype._onDrain = function (request) {
var buffer = this._server.buffer.slice().reverse();
this._server.full = false;
while (buffer.length > 0) {
this._send(buffer.pop());
}
};
module.exports = StreamClient;