server.js
4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
const express = require('express');
const fs = require('fs').promises;
const multer = require('multer');
const redis = require('redis');
const { promisify } = require('util');
const { readM3U8 } = require('./utils');
const RTMP_INPUT_FOLDER = '/root/hls/test_720p2628kbs';
const app = express();
const client = redis.createClient();
const upload = multer({
storage: multer.diskStorage({
destination: (req, file, cb) => cb(null, __dirname + '/public/live'),
filename: (req, file, cb) => cb(null, file.originalname),
}),
});
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
const delAsync = promisify(client.del).bind(client);
const getAsync = promisify(client.get).bind(client);
const setAsync = promisify(client.set).bind(client);
const rpushAsync = promisify(client.rpush).bind(client);
const lpopAsync = promisify(client.lpop).bind(client);
const lrangeAsync = promisify(client.lrange).bind(client);
const saddAsync = promisify(client.sadd).bind(client);
const sismemberAsync = promisify(client.sismember).bind(client);
const sremAsync = promisify(client.srem).bind(client);
app.use(express.static(__dirname + '/public'));
app.get('/live.m3u8', async (req, res) => {
const EXT_X_TARGETDURATION = await getAsync('HLS_EXT_X_TARGETDURATION');
const PLAYLIST = await lrangeAsync('HLS_COMPLETE', 0, -1);
const data = `#EXTM3U
#EXT-X-PLAYLIST-TYPE:EVENT
#EXT-X-TARGETDURATION:${EXT_X_TARGETDURATION}
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
${PLAYLIST.map(x => `#EXTINF:${x.split('/')[1]},\nlive/${x.split('/')[0]}\n`).join('')}`;
res.set('content-type', 'audio/mpegurl');
return res.send(data);
});
app.get('/origin.m3u8', async (req, res) => {
const EXT_X_TARGETDURATION = await getAsync('HLS_EXT_X_TARGETDURATION');
const PLAYLIST = await lrangeAsync('HLS_ORIGINAL', 0, -1);
const data = `#EXTM3U
#EXT-X-PLAYLIST-TYPE:EVENT
#EXT-X-TARGETDURATION:${EXT_X_TARGETDURATION}
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
${PLAYLIST.map(x => `#EXTINF:${x.split('/')[1]},\norigin/${x.split('/')[0]}\n`).join('')}`;
res.set('content-type', 'audio/mpegurl');
return res.send(data);
});
app.get('/check', async (req, res) => {
const data = await lpopAsync('HLS_WAITING');
if (data === null) {
return res.json({ data: null });
}
await rpushAsync('HLS_PROGRESS', data);
return res.json({ data: data.split('/')[0] });
});
app.post('/upload', upload.single('file'), async (req, res) => {
await saddAsync('HLS_UPLOADED', req.file.originalname);
return res.json({ success: true });
});
app.use((req, res, next) => {
return res.status(404).json({ error: 'Not Found' });
});
app.use((err, req, res, next) => {
return res.status(500).json({ error: 'Internal Server Error' });
});
app.listen(3000);
const main = async () => {
for (;;) {
try {
const m3u8 = await readM3U8(`${RTMP_INPUT_FOLDER}/index.m3u8`);
const lastSeq = await getAsync('HLS_EXT_X_MEDIA_SEQUENCE');
if (m3u8.EXT_X_MEDIA_SEQUENCE !== lastSeq) {
if (m3u8.EXT_X_MEDIA_SEQUENCE === '0') {
await delAsync('HLS_WAITING');
await delAsync('HLS_PROGRESS');
await delAsync('HLS_UPLOADED');
await delAsync('HLS_COMPLETE');
await delAsync('HLS_ORIGINAL'); // TODO: debug
await fs.rmdir(__dirname + '/public/origin/', { recursive: true });
await fs.rmdir(__dirname + '/public/live/', { recursive: true });
await fs.mkdir(__dirname + '/public/origin/');
await fs.mkdir(__dirname + '/public/live/');
await setAsync('HLS_EXT_X_TARGETDURATION', m3u8.EXT_X_TARGETDURATION);
}
const item = m3u8.PLAYLIST.pop();
await fs.copyFile(`${RTMP_INPUT_FOLDER}/${item.file}`, __dirname + `/public/origin/${item.file}`);
await rpushAsync('HLS_WAITING', `${item.file}/${item.time}`);
await rpushAsync('HLS_ORIGINAL', `${item.file}/${item.time}`); // TODO: debug
await setAsync('HLS_EXT_X_MEDIA_SEQUENCE', m3u8.EXT_X_MEDIA_SEQUENCE);
}
const first_of_progress = await lrangeAsync('HLS_PROGRESS', 0, 0);
if (first_of_progress.length !== 0 && await sismemberAsync('HLS_UPLOADED', first_of_progress[0].split('/')[0]) === 1) {
await rpushAsync('HLS_COMPLETE', first_of_progress[0]);
await lpopAsync('HLS_PROGRESS');
await sremAsync('HLS_UPLOADED', first_of_progress[0].split('/')[0]);
}
} catch (e) {}
await sleep(1000);
}
}
main().catch(err => console.error(err));