asyncbox.js
3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// transpile:main
import B from 'bluebird';
import { mapify } from 'es6-mapify';
import _ from 'lodash';
async function sleep (ms) {
return await B.delay(ms);
}
async function retry (times, fn, ...args) {
let tries = 0;
let done = false;
let res = null;
while (!done && tries < times) {
tries++;
try {
res = await fn(...args);
done = true;
} catch (err) {
if (tries >= times) {
throw err;
}
}
}
return res;
}
async function retryInterval (times, sleepMs, fn, ...args) {
let count = 0;
let wrapped = async () => {
count++;
let res;
try {
res = await fn(...args);
} catch (e) {
// do not pause when finished the last retry
if (count !== times) {
await sleep(sleepMs);
}
throw e;
}
return res;
};
return await retry(times, wrapped);
}
async function parallel (promises) {
return await B.all(promises);
}
function nodeify (promisey, cb) { // eslint-disable-line promise/prefer-await-to-callbacks
return B.resolve(promisey).nodeify(cb);
}
function nodeifyAll (promiseyMap) {
let cbMap = {};
for (const [name, fn] of mapify(promiseyMap)) {
cbMap[name] = function (...args) {
const _cb = args.slice(-1)[0];
args = args.slice(0, -1);
nodeify(fn(...args), _cb);
};
}
return cbMap;
}
function asyncify (fn, ...args) {
B.resolve(fn(...args)).done();
}
async function asyncmap (coll, mapper, runInParallel = true) {
if (runInParallel) {
return parallel(coll.map(mapper));
}
let newColl = [];
for (let item of coll) {
newColl.push(await mapper(item));
}
return newColl;
}
async function asyncfilter (coll, filter, runInParallel = true) {
let newColl = [];
if (runInParallel) {
let bools = await parallel(coll.map(filter));
for (let i = 0; i < coll.length; i++) {
if (bools[i]) {
newColl.push(coll[i]);
}
}
} else {
for (let item of coll) {
if (await filter(item)) {
newColl.push(item);
}
}
}
return newColl;
}
async function waitForCondition (condFn, opts = {}) {
_.defaults(opts, {
waitMs: 5000,
intervalMs: 500,
});
const debug = opts.logger ? opts.logger.debug.bind(opts.logger) : _.noop;
const error = opts.error;
const begunAt = Date.now();
const endAt = begunAt + opts.waitMs;
const spin = async function spin () {
const result = await condFn();
if (result) {
return result;
}
const now = Date.now();
const waited = now - begunAt;
if (now < endAt) {
debug(`Waited for ${waited} ms so far`);
await B.delay(opts.intervalMs);
return await spin();
}
// if there is an error option, it is either a string message or an error itself
throw error
? (_.isString(error) ? new Error(error) : error)
: new Error(`Condition unmet after ${waited} ms. Timing out.`);
};
return await spin();
}
export {
sleep, retry, nodeify, nodeifyAll, retryInterval, asyncify, parallel,
asyncmap, asyncfilter, waitForCondition
};