ShardingManager.js
11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
'use strict';
const EventEmitter = require('events');
const fs = require('fs');
const path = require('path');
const Shard = require('./Shard');
const { Error, TypeError, RangeError } = require('../errors');
const Collection = require('../util/Collection');
const Util = require('../util/Util');
/**
* This is a utility class that makes multi-process sharding of a bot an easy and painless experience.
* It works by spawning a self-contained {@link ChildProcess} or {@link Worker} for each individual shard, each
* containing its own instance of your bot's {@link Client}. They all have a line of communication with the master
* process, and there are several useful methods that utilise it in order to simplify tasks that are normally difficult
* with sharding. It can spawn a specific number of shards or the amount that Discord suggests for the bot, and takes a
* path to your main bot script to launch for each one.
* @extends {EventEmitter}
*/
class ShardingManager extends EventEmitter {
/**
* The mode to spawn shards with for a {@link ShardingManager}: either "process" to use child processes, or
* "worker" to use [Worker threads](https://nodejs.org/api/worker_threads.html).
* @typedef {Object} ShardingManagerMode
*/
/**
* @param {string} file Path to your shard script file
* @param {Object} [options] Options for the sharding manager
* @param {string|number} [options.totalShards='auto'] Number of total shards of all shard managers or "auto"
* @param {string|number[]} [options.shardList='auto'] List of shards to spawn or "auto"
* @param {ShardingManagerMode} [options.mode='process'] Which mode to use for shards
* @param {boolean} [options.respawn=true] Whether shards should automatically respawn upon exiting
* @param {string[]} [options.shardArgs=[]] Arguments to pass to the shard script when spawning
* (only available when using the `process` mode)
* @param {string[]} [options.execArgv=[]] Arguments to pass to the shard script executable when spawning
* (only available when using the `process` mode)
* @param {string} [options.token] Token to use for automatic shard count and passing to shards
*/
constructor(file, options = {}) {
super();
options = Util.mergeDefault(
{
totalShards: 'auto',
mode: 'process',
respawn: true,
shardArgs: [],
execArgv: [],
token: process.env.DISCORD_TOKEN,
},
options,
);
/**
* Path to the shard script file
* @type {string}
*/
this.file = file;
if (!file) throw new Error('CLIENT_INVALID_OPTION', 'File', 'specified.');
if (!path.isAbsolute(file)) this.file = path.resolve(process.cwd(), file);
const stats = fs.statSync(this.file);
if (!stats.isFile()) throw new Error('CLIENT_INVALID_OPTION', 'File', 'a file');
/**
* List of shards this sharding manager spawns
* @type {string|number[]}
*/
this.shardList = options.shardList || 'auto';
if (this.shardList !== 'auto') {
if (!Array.isArray(this.shardList)) {
throw new TypeError('CLIENT_INVALID_OPTION', 'shardList', 'an array.');
}
this.shardList = [...new Set(this.shardList)];
if (this.shardList.length < 1) throw new RangeError('CLIENT_INVALID_OPTION', 'shardList', 'at least 1 ID.');
if (
this.shardList.some(
shardID => typeof shardID !== 'number' || isNaN(shardID) || !Number.isInteger(shardID) || shardID < 0,
)
) {
throw new TypeError('CLIENT_INVALID_OPTION', 'shardList', 'an array of positive integers.');
}
}
/**
* Amount of shards that all sharding managers spawn in total
* @type {number}
*/
this.totalShards = options.totalShards || 'auto';
if (this.totalShards !== 'auto') {
if (typeof this.totalShards !== 'number' || isNaN(this.totalShards)) {
throw new TypeError('CLIENT_INVALID_OPTION', 'Amount of shards', 'a number.');
}
if (this.totalShards < 1) throw new RangeError('CLIENT_INVALID_OPTION', 'Amount of shards', 'at least 1.');
if (!Number.isInteger(this.totalShards)) {
throw new RangeError('CLIENT_INVALID_OPTION', 'Amount of shards', 'an integer.');
}
}
/**
* Mode for shards to spawn with
* @type {ShardingManagerMode}
*/
this.mode = options.mode;
if (this.mode !== 'process' && this.mode !== 'worker') {
throw new RangeError('CLIENT_INVALID_OPTION', 'Sharding mode', '"process" or "worker"');
}
/**
* Whether shards should automatically respawn upon exiting
* @type {boolean}
*/
this.respawn = options.respawn;
/**
* An array of arguments to pass to shards (only when {@link ShardingManager#mode} is `process`)
* @type {string[]}
*/
this.shardArgs = options.shardArgs;
/**
* An array of arguments to pass to the executable (only when {@link ShardingManager#mode} is `process`)
* @type {string[]}
*/
this.execArgv = options.execArgv;
/**
* Token to use for obtaining the automatic shard count, and passing to shards
* @type {?string}
*/
this.token = options.token ? options.token.replace(/^Bot\s*/i, '') : null;
/**
* A collection of shards that this manager has spawned
* @type {Collection<number, Shard>}
*/
this.shards = new Collection();
process.env.SHARDING_MANAGER = true;
process.env.SHARDING_MANAGER_MODE = this.mode;
process.env.DISCORD_TOKEN = this.token;
}
/**
* Creates a single shard.
* <warn>Using this method is usually not necessary if you use the spawn method.</warn>
* @param {number} [id=this.shards.size] ID of the shard to create
* <info>This is usually not necessary to manually specify.</info>
* @returns {Shard} Note that the created shard needs to be explicitly spawned using its spawn method.
*/
createShard(id = this.shards.size) {
const shard = new Shard(this, id);
this.shards.set(id, shard);
/**
* Emitted upon creating a shard.
* @event ShardingManager#shardCreate
* @param {Shard} shard Shard that was created
*/
this.emit('shardCreate', shard);
return shard;
}
/**
* Spawns multiple shards.
* @param {number|string} [amount=this.totalShards] Number of shards to spawn
* @param {number} [delay=5500] How long to wait in between spawning each shard (in milliseconds)
* @param {number} [spawnTimeout=30000] The amount in milliseconds to wait until the {@link Client} has become ready
* before resolving. (-1 or Infinity for no wait)
* @returns {Promise<Collection<number, Shard>>}
*/
async spawn(amount = this.totalShards, delay = 5500, spawnTimeout) {
// Obtain/verify the number of shards to spawn
if (amount === 'auto') {
amount = await Util.fetchRecommendedShards(this.token);
} else {
if (typeof amount !== 'number' || isNaN(amount)) {
throw new TypeError('CLIENT_INVALID_OPTION', 'Amount of shards', 'a number.');
}
if (amount < 1) throw new RangeError('CLIENT_INVALID_OPTION', 'Amount of shards', 'at least 1.');
if (!Number.isInteger(amount)) {
throw new TypeError('CLIENT_INVALID_OPTION', 'Amount of shards', 'an integer.');
}
}
// Make sure this many shards haven't already been spawned
if (this.shards.size >= amount) throw new Error('SHARDING_ALREADY_SPAWNED', this.shards.size);
if (this.shardList === 'auto' || this.totalShards === 'auto' || this.totalShards !== amount) {
this.shardList = [...Array(amount).keys()];
}
if (this.totalShards === 'auto' || this.totalShards !== amount) {
this.totalShards = amount;
}
if (this.shardList.some(shardID => shardID >= amount)) {
throw new RangeError(
'CLIENT_INVALID_OPTION',
'Amount of shards',
'bigger than the highest shardID in the shardList option.',
);
}
// Spawn the shards
for (const shardID of this.shardList) {
const promises = [];
const shard = this.createShard(shardID);
promises.push(shard.spawn(spawnTimeout));
if (delay > 0 && this.shards.size !== this.shardList.length) promises.push(Util.delayFor(delay));
await Promise.all(promises); // eslint-disable-line no-await-in-loop
}
return this.shards;
}
/**
* Sends a message to all shards.
* @param {*} message Message to be sent to the shards
* @returns {Promise<Shard[]>}
*/
broadcast(message) {
const promises = [];
for (const shard of this.shards.values()) promises.push(shard.send(message));
return Promise.all(promises);
}
/**
* Evaluates a script on all shards, or a given shard, in the context of the {@link Client}s.
* @param {string} script JavaScript to run on each shard
* @param {number} [shard] Shard to run on, all if undefined
* @returns {Promise<*>|Promise<Array<*>>} Results of the script execution
*/
broadcastEval(script, shard) {
return this._performOnShards('eval', [script], shard);
}
/**
* Fetches a client property value of each shard, or a given shard.
* @param {string} prop Name of the client property to get, using periods for nesting
* @param {number} [shard] Shard to fetch property from, all if undefined
* @returns {Promise<*>|Promise<Array<*>>}
* @example
* manager.fetchClientValues('guilds.cache.size')
* .then(results => console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`))
* .catch(console.error);
*/
fetchClientValues(prop, shard) {
return this._performOnShards('fetchClientValue', [prop], shard);
}
/**
* Runs a method with given arguments on all shards, or a given shard.
* @param {string} method Method name to run on each shard
* @param {Array<*>} args Arguments to pass through to the method call
* @param {number} [shard] Shard to run on, all if undefined
* @returns {Promise<*>|Promise<Array<*>>} Results of the method execution
* @private
*/
_performOnShards(method, args, shard) {
if (this.shards.size === 0) return Promise.reject(new Error('SHARDING_NO_SHARDS'));
if (this.shards.size !== this.shardList.length) return Promise.reject(new Error('SHARDING_IN_PROCESS'));
if (typeof shard === 'number') {
if (this.shards.has(shard)) return this.shards.get(shard)[method](...args);
return Promise.reject(new Error('SHARDING_SHARD_NOT_FOUND', shard));
}
const promises = [];
for (const sh of this.shards.values()) promises.push(sh[method](...args));
return Promise.all(promises);
}
/**
* Kills all running shards and respawns them.
* @param {number} [shardDelay=5000] How long to wait between shards (in milliseconds)
* @param {number} [respawnDelay=500] How long to wait between killing a shard's process and restarting it
* (in milliseconds)
* @param {number} [spawnTimeout=30000] The amount in milliseconds to wait for a shard to become ready before
* continuing to another. (-1 or Infinity for no wait)
* @returns {Promise<Collection<string, Shard>>}
*/
async respawnAll(shardDelay = 5000, respawnDelay = 500, spawnTimeout) {
let s = 0;
for (const shard of this.shards.values()) {
const promises = [shard.respawn(respawnDelay, spawnTimeout)];
if (++s < this.shards.size && shardDelay > 0) promises.push(Util.delayFor(shardDelay));
await Promise.all(promises); // eslint-disable-line no-await-in-loop
}
return this.shards;
}
}
module.exports = ShardingManager;