server.js 4.38 KB
const express = require('express');
const fs = require('fs').promises;
const multer = require('multer');
const redis = require('redis');
const { promisify } = require('util');
const { readM3U8 } = require('./utils');

const RTMP_INPUT_FOLDER = '/root/hls/test_720p2628kbs';

const app = express();
const client = redis.createClient();
const upload = multer({
  storage: multer.diskStorage({
    destination: (req, file, cb) => cb(null, __dirname + '/public/live'),
    filename: (req, file, cb) => cb(null, file.originalname),
  }),
});

const sleep = (ms) => new Promise(r => setTimeout(r, ms));
const delAsync = promisify(client.del).bind(client);
const getAsync = promisify(client.get).bind(client);
const setAsync = promisify(client.set).bind(client);
const rpushAsync = promisify(client.rpush).bind(client);
const lpopAsync = promisify(client.lpop).bind(client);
const lrangeAsync = promisify(client.lrange).bind(client);
const saddAsync = promisify(client.sadd).bind(client);
const sismemberAsync = promisify(client.sismember).bind(client);
const sremAsync = promisify(client.srem).bind(client);

app.use(express.static(__dirname + '/public'));

app.get('/live.m3u8', async (req, res) => {
  const EXT_X_TARGETDURATION = await getAsync('HLS_EXT_X_TARGETDURATION');
  const PLAYLIST = await lrangeAsync('HLS_COMPLETE', 0, -1);
  const data = `#EXTM3U
#EXT-X-PLAYLIST-TYPE:EVENT
#EXT-X-TARGETDURATION:${EXT_X_TARGETDURATION}
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
${PLAYLIST.map(x => `#EXTINF:${x.split('/')[1]},\nlive/${x.split('/')[0]}\n`).join('')}`;
  res.set('content-type', 'audio/mpegurl');
  return res.send(data);
});

app.get('/origin.m3u8', async (req, res) => {
  const EXT_X_TARGETDURATION = await getAsync('HLS_EXT_X_TARGETDURATION');
  const PLAYLIST = await lrangeAsync('HLS_ORIGINAL', 0, -1);
  const data = `#EXTM3U
#EXT-X-PLAYLIST-TYPE:EVENT
#EXT-X-TARGETDURATION:${EXT_X_TARGETDURATION}
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
${PLAYLIST.map(x => `#EXTINF:${x.split('/')[1]},\norigin/${x.split('/')[0]}\n`).join('')}`;
  res.set('content-type', 'audio/mpegurl');
  return res.send(data);
});

app.get('/check', async (req, res) => {
  const data = await lpopAsync('HLS_WAITING');
  if (data === null) {
    return res.json({ data: null });
  }
  await rpushAsync('HLS_PROGRESS', data);
  return res.json({ data: data.split('/')[0] });
});

app.post('/upload', upload.single('file'), async (req, res) => {
  await saddAsync('HLS_UPLOADED', req.file.originalname);
  return res.json({ success: true });
});

app.use((req, res, next) => {
  return res.status(404).json({ error: 'Not Found' });
});

app.use((err, req, res, next) => {
  return res.status(500).json({ error: 'Internal Server Error' });
});

app.listen(3000);

const main = async () => {
  for (;;) {
    try {
      const m3u8 = await readM3U8(`${RTMP_INPUT_FOLDER}/index.m3u8`);
      const lastSeq = await getAsync('HLS_EXT_X_MEDIA_SEQUENCE');
      if (m3u8.EXT_X_MEDIA_SEQUENCE !== lastSeq) {
        if (m3u8.EXT_X_MEDIA_SEQUENCE === '0') {
          await delAsync('HLS_WAITING');
          await delAsync('HLS_PROGRESS');
          await delAsync('HLS_UPLOADED');
          await delAsync('HLS_COMPLETE');
          await delAsync('HLS_ORIGINAL'); // TODO: debug
          await fs.rmdir(__dirname + '/public/origin/', { recursive: true });
          await fs.rmdir(__dirname + '/public/live/', { recursive: true });
          await fs.mkdir(__dirname + '/public/origin/');
          await fs.mkdir(__dirname + '/public/live/');
          await setAsync('HLS_EXT_X_TARGETDURATION', m3u8.EXT_X_TARGETDURATION);
        }
        const item = m3u8.PLAYLIST.pop();
        await fs.copyFile(`${RTMP_INPUT_FOLDER}/${item.file}`, __dirname + `/public/origin/${item.file}`);
        await rpushAsync('HLS_WAITING', `${item.file}/${item.time}`);
        await rpushAsync('HLS_ORIGINAL', `${item.file}/${item.time}`); // TODO: debug
        await setAsync('HLS_EXT_X_MEDIA_SEQUENCE', m3u8.EXT_X_MEDIA_SEQUENCE);
      }
      const first_of_progress = await lrangeAsync('HLS_PROGRESS', 0, 0);
      if (first_of_progress.length !== 0 && await sismemberAsync('HLS_UPLOADED', first_of_progress[0].split('/')[0]) === 1) {
        await rpushAsync('HLS_COMPLETE', first_of_progress[0]);
        await lpopAsync('HLS_PROGRESS');
        await sremAsync('HLS_UPLOADED', first_of_progress[0].split('/')[0]);
      }
    } catch (e) {}
    await sleep(1000);
  }
}

main().catch(err => console.error(err));