서승완

feat: add mosaic-server and mosaic-client

import json
import os
import requests
import subprocess
import time
import cv2
import torch
from models.experimental import attempt_load
from utils.datasets import LoadImages
from utils.general import check_img_size, non_max_suppression, set_logging, scale_coords
from utils.torch_utils import select_device, time_synchronized
SERVER_CHECK_ENDPOINT = 'http://mosaic.khunet.net'
WEIGHT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'weight.pt')
INPUT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'input.mp4')
OUTPUT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'output.ts')
def download(url, file_path):
with open(file_path, 'wb') as file:
res = requests.get(url)
file.write(res.content)
def mosaic(src, ratio=0.07):
small = cv2.resize(src, None, fx=ratio, fy=ratio)
return cv2.resize(small, src.shape[:2][::-1], interpolation=cv2.INTER_NEAREST)
@torch.no_grad()
def detect(weight_path, input_path, output_path):
command = ['ffmpeg',
'-loglevel', 'panic',
'-y',
'-f', 'rawvideo',
'-pixel_format', 'bgr24',
'-video_size', "{}x{}".format(1280, 720),
'-framerate', str(30),
'-i', '-',
'-i', input_path,
'-c:a', 'copy',
'-map', '0:v:0',
'-map', '1:a:0',
'-c:v', 'libx264',
'-pix_fmt', 'yuv420p',
'-preset', 'ultrafast',
output_path]
writer = subprocess.Popen(command, stdin=subprocess.PIPE)
source, weights, imgsz = input_path, weight_path, 640
# Initialize
set_logging()
device = select_device('')
# Load model
model = attempt_load(weights, map_location=device) # load FP32 model
stride = int(model.stride.max()) # model stride
imgsz = check_img_size(imgsz, s=stride) # check img_size
names = model.module.names if hasattr(model, 'module') else model.names # get class names
# Set Dataloader
dataset = LoadImages(source, img_size=imgsz, stride=stride)
# Run inference
if device.type != 'cpu':
model(torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(next(model.parameters()))) # run once
t0 = time.time()
for path, img, im0s, vid_cap in dataset:
img = torch.from_numpy(img).to(device)
img = img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Inference
t1 = time_synchronized()
pred = model(img, augment=False)[0]
# Apply NMS
pred = non_max_suppression(pred, max_det=1000)
t2 = time_synchronized()
# Process detections
for i, det in enumerate(pred): # detections per image
p, s, im0, frame = path, '', im0s.copy(), getattr(dataset, 'frame', 0)
s += '%gx%g ' % img.shape[2:] # print string
if len(det):
# Rescale boxes from img_size to im0 size
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
# Print results
for c in det[:, -1].unique():
n = (det[:, -1] == c).sum() # detections per class
s += f"{n} {names[int(c)]}{'s' * (n > 1)}, " # add to string
# Write results
for *xyxy, conf, cls in reversed(det):
x1, y1, x2, y2 = int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])
src = im0[y1:y2, x1:x2]
dst = im0.copy()
dst[y1:y2, x1:x2] = mosaic(src)
im0 = dst
# Print time (inference + NMS)
# print(f'{s}Done. ({t2 - t1:.3f}s)')
# Save results (image with detections)
writer.stdin.write(im0.tobytes())
writer.stdin.close()
writer.wait()
print(f'Done. ({time.time() - t0:.3f}s)')
if __name__ == '__main__':
while True:
try:
response = requests.get(SERVER_CHECK_ENDPOINT + '/check')
data = json.loads(response.text)
if data['data'] is None:
continue
download(SERVER_CHECK_ENDPOINT + '/origin/' + data['data'], INPUT_PATH)
detect(WEIGHT_PATH, INPUT_PATH, OUTPUT_PATH)
response = requests.post(SERVER_CHECK_ENDPOINT + '/upload', files={'file': (data['data'], open(OUTPUT_PATH, 'rb'))})
print(data['data'] + ' : ' + response.text)
except:
print('Error!')
time.sleep(0.5)
This diff is collapsed. Click to expand it.
{
"name": "mosaic-server",
"version": "0.0.0",
"dependencies": {
"express": "^4.17.1",
"multer": "^1.4.2",
"redis": "^3.1.2"
}
}
const express = require('express');
const fs = require('fs').promises;
const multer = require('multer');
const redis = require('redis');
const { promisify } = require('util');
const { readM3U8 } = require('./utils');
const RTMP_INPUT_FOLDER = '/root/hls/test_720p2628kbs';
const app = express();
const client = redis.createClient();
const upload = multer({
storage: multer.diskStorage({
destination: (req, file, cb) => cb(null, __dirname + '/public/live'),
filename: (req, file, cb) => cb(null, file.originalname),
}),
});
const sleep = (ms) => new Promise(r => setTimeout(r, ms));
const delAsync = promisify(client.del).bind(client);
const getAsync = promisify(client.get).bind(client);
const setAsync = promisify(client.set).bind(client);
const rpushAsync = promisify(client.rpush).bind(client);
const lpopAsync = promisify(client.lpop).bind(client);
const lrangeAsync = promisify(client.lrange).bind(client);
const saddAsync = promisify(client.sadd).bind(client);
const sismemberAsync = promisify(client.sismember).bind(client);
const sremAsync = promisify(client.srem).bind(client);
app.use(express.static(__dirname + '/public'));
app.get('/live.m3u8', async (req, res) => {
const EXT_X_TARGETDURATION = await getAsync('HLS_EXT_X_TARGETDURATION');
const PLAYLIST = await lrangeAsync('HLS_COMPLETE', 0, -1);
const data = `#EXTM3U
#EXT-X-PLAYLIST-TYPE:EVENT
#EXT-X-TARGETDURATION:${EXT_X_TARGETDURATION}
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
${PLAYLIST.map(x => `#EXTINF:${x.split('/')[1]},\nlive/${x.split('/')[0]}\n`).join('')}`;
res.set('content-type', 'audio/mpegurl');
return res.send(data);
});
app.get('/origin.m3u8', async (req, res) => {
const EXT_X_TARGETDURATION = await getAsync('HLS_EXT_X_TARGETDURATION');
const PLAYLIST = await lrangeAsync('HLS_ORIGINAL', 0, -1);
const data = `#EXTM3U
#EXT-X-PLAYLIST-TYPE:EVENT
#EXT-X-TARGETDURATION:${EXT_X_TARGETDURATION}
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
${PLAYLIST.map(x => `#EXTINF:${x.split('/')[1]},\norigin/${x.split('/')[0]}\n`).join('')}`;
res.set('content-type', 'audio/mpegurl');
return res.send(data);
});
app.get('/check', async (req, res) => {
const data = await lpopAsync('HLS_WAITING');
if (data === null) {
return res.json({ data: null });
}
await rpushAsync('HLS_PROGRESS', data);
return res.json({ data: data.split('/')[0] });
});
app.post('/upload', upload.single('file'), async (req, res) => {
await saddAsync('HLS_UPLOADED', req.file.originalname);
return res.json({ success: true });
});
app.use((req, res, next) => {
return res.status(404).json({ error: 'Not Found' });
});
app.use((err, req, res, next) => {
return res.status(500).json({ error: 'Internal Server Error' });
});
app.listen(3000);
const main = async () => {
for (;;) {
try {
const m3u8 = await readM3U8(`${RTMP_INPUT_FOLDER}/index.m3u8`);
const lastSeq = await getAsync('HLS_EXT_X_MEDIA_SEQUENCE');
if (m3u8.EXT_X_MEDIA_SEQUENCE !== lastSeq) {
if (m3u8.EXT_X_MEDIA_SEQUENCE === '0') {
await delAsync('HLS_WAITING');
await delAsync('HLS_PROGRESS');
await delAsync('HLS_UPLOADED');
await delAsync('HLS_COMPLETE');
await delAsync('HLS_ORIGINAL'); // TODO: debug
await fs.rmdir(__dirname + '/public/origin/', { recursive: true });
await fs.rmdir(__dirname + '/public/live/', { recursive: true });
await fs.mkdir(__dirname + '/public/origin/');
await fs.mkdir(__dirname + '/public/live/');
await setAsync('HLS_EXT_X_TARGETDURATION', m3u8.EXT_X_TARGETDURATION);
}
const item = m3u8.PLAYLIST.pop();
await fs.copyFile(`${RTMP_INPUT_FOLDER}/${item.file}`, __dirname + `/public/origin/${item.file}`);
await rpushAsync('HLS_WAITING', `${item.file}/${item.time}`);
await rpushAsync('HLS_ORIGINAL', `${item.file}/${item.time}`); // TODO: debug
await setAsync('HLS_EXT_X_MEDIA_SEQUENCE', m3u8.EXT_X_MEDIA_SEQUENCE);
}
const first_of_progress = await lrangeAsync('HLS_PROGRESS', 0, 0);
if (first_of_progress.length !== 0 && await sismemberAsync('HLS_UPLOADED', first_of_progress[0].split('/')[0]) === 1) {
await rpushAsync('HLS_COMPLETE', first_of_progress[0]);
await lpopAsync('HLS_PROGRESS');
await sremAsync('HLS_UPLOADED', first_of_progress[0].split('/')[0]);
}
} catch (e) {}
await sleep(1000);
}
}
main().catch(err => console.error(err));
const fs = require('fs').promises;
module.exports.readM3U8 = async (filePath) => {
const data = await fs.readFile(filePath);
const EXT_X_VERSION = String(data).split('#EXT-X-VERSION:')[1].split('\n')[0];
const EXT_X_MEDIA_SEQUENCE = String(data).split('#EXT-X-MEDIA-SEQUENCE:')[1].split('\n')[0];
const EXT_X_TARGETDURATION = String(data).split('#EXT-X-TARGETDURATION:')[1].split('\n')[0];
const EXTINF = String(data).split('#EXTINF:');
EXTINF.shift();
const PLAYLIST = EXTINF.map(x => ({ file: x.split('\n')[1], time: x.split(',')[0] }));
return {
EXT_X_VERSION,
EXT_X_MEDIA_SEQUENCE,
EXT_X_TARGETDURATION,
PLAYLIST,
};
};