Bottleneck.js
4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
var PriorityQueue = function(size) {
var me = {}, slots, i, total = null;
// initialize arrays to hold queue elements
size = Math.max(+size | 0, 1);
slots = [];
for (i = 0; i < size; i += 1) {
slots.push([]);
}
// Public methods
me.size = function () {
var i;
if (total === null) {
total = 0;
for (i = 0; i < size; i += 1) {
total += slots[i].length;
}
}
return total;
};
me.enqueue = function (obj, priority) {
var priorityOrig;
// Convert to integer with a default value of 0.
priority = priority && + priority | 0 || 0;
// Clear cache for total.
total = null;
if (priority) {
priorityOrig = priority;
if (priority < 0 || priority >= size) {
priority = (size - 1);
// put obj at the end of the line
console.error("invalid priority: " + priorityOrig + " must be between 0 and " + priority);
}
}
slots[priority].push(obj);
};
me.dequeue = function (callback) {
var obj = null, i, sl = slots.length;
// Clear cache for total.
total = null;
for (i = 0; i < sl; i += 1) {
if (slots[i].length) {
obj = slots[i].shift();
break;
}
}
return obj;
};
return me;
};
function Bottleneck(maxConcurrent, rateLimit, priorityRange, defaultPriority, cluster) {
if(isNaN(maxConcurrent) || isNaN(rateLimit)) {
throw "maxConcurrent and rateLimit must be numbers";
}
priorityRange = priorityRange || 1;
if(isNaN(priorityRange)) {
throw "priorityRange must be a number";
}
priorityRange = parseInt(priorityRange);
defaultPriority = defaultPriority ? defaultPriority : Math.floor(priorityRange/2);
if(isNaN(defaultPriority)) {
throw "defaultPriority must be a number";
}
defaultPriority = defaultPriority >= priorityRange ? priorityRange-1 : defaultPriority;
defaultPriority = parseInt(defaultPriority);
this.cluster = cluster;
this.rateLimit = parseInt(rateLimit);
this.maxConcurrent = this.rateLimit ? 1 : parseInt(maxConcurrent);
this._waitingClients = new PriorityQueue(priorityRange);
this._priorityRange = priorityRange;
this._defaultPriority = defaultPriority;
this._nextRequest = Date.now();
this._tasksRunning = 0;
}
Bottleneck.prototype.setName = function(name) {
this.name = name;
}
Bottleneck.prototype.setRateLimit = function(rateLimit) {
if(isNaN(rateLimit)) {
throw "rateLimit must be a number";
}
this.rateLimit = parseInt(rateLimit);
if(this.rateLimit > 0) {
this.maxConcurrent = 1;
}
}
Bottleneck.prototype.submit = function(options, clientCallback) {
var self = this;
var priority = null;
if(typeof options == "number") {
priority = parseInt(options);
} else {
priority = options.priority;
}
priority = Number.isInteger(priority) ? priority: self._defaultPriority;
priority = priority > self._priorityRange-1 ? self._priorityRange-1 : priority;
this._waitingClients.enqueue(clientCallback, priority);
self._tryToRun();
return;
}
Bottleneck.prototype._tryToRun = function() {
var self = this;
if(self._tasksRunning < self.maxConcurrent && self.hasWaitingClients()) {
++self._tasksRunning;
var wait = Math.max(self._nextRequest - Date.now(), 0);
self._nextRequest = Date.now() + wait + self.rateLimit;
var obj = self.dequeue();
var next = obj.next;
var limiter = obj.limiter;
setTimeout(function(){
var done = function() {
--self._tasksRunning;
self._tryToRun();
}
next(done, limiter);
}, wait);
}
return;
}
Bottleneck.prototype.hasWaitingClients = function() {
if(this._waitingClients.size()) {
return true;
}
if(this.cluster && this.cluster._waitingClients()) {
return true;
}
return false;
}
Bottleneck.prototype.dequeue = function() {
if(this._waitingClients.size()) {
return {
next: this._waitingClients.dequeue(),
limiter: null
};
}
return this.cluster.dequeue(this.name);
}
Bottleneck.Cluster = Bottleneck.prototype.Cluster = require("./Cluster");
module.exports = Bottleneck;