Refactor transcoding job handlers

This commit is contained in:
Chocobozzz 2021-01-21 15:58:17 +01:00
parent 3b01f4c0ac
commit 24516aa26a
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
8 changed files with 165 additions and 94 deletions

View File

@ -47,7 +47,7 @@ async function run () {
for (const resolution of resolutionsEnabled) { for (const resolution of resolutionsEnabled) {
dataInput.push({ dataInput.push({
type: 'hls', type: 'new-resolution-to-hls',
videoUUID: video.uuid, videoUUID: video.uuid,
resolution, resolution,
isPortraitMode: false, isPortraitMode: false,
@ -56,14 +56,14 @@ async function run () {
} }
} else if (program.resolution !== undefined) { } else if (program.resolution !== undefined) {
dataInput.push({ dataInput.push({
type: 'new-resolution' as 'new-resolution', type: 'new-resolution-to-webtorrent',
videoUUID: video.uuid, videoUUID: video.uuid,
isNewVideo: false, isNewVideo: false,
resolution: program.resolution resolution: program.resolution
}) })
} else { } else {
dataInput.push({ dataInput.push({
type: 'optimize' as 'optimize', type: 'optimize-to-webtorrent',
videoUUID: video.uuid, videoUUID: video.uuid,
isNewVideo: false isNewVideo: false
}) })

View File

@ -74,14 +74,14 @@ function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile) {
if (videoFile.isAudio()) { if (videoFile.isAudio()) {
dataInput = { dataInput = {
type: 'merge-audio' as 'merge-audio', type: 'merge-audio-to-webtorrent',
resolution: DEFAULT_AUDIO_RESOLUTION, resolution: DEFAULT_AUDIO_RESOLUTION,
videoUUID: video.uuid, videoUUID: video.uuid,
isNewVideo: true isNewVideo: true
} }
} else { } else {
dataInput = { dataInput = {
type: 'optimize' as 'optimize', type: 'optimize-to-webtorrent',
videoUUID: video.uuid, videoUUID: video.uuid,
isNewVideo: true isNewVideo: true
} }

View File

@ -9,7 +9,7 @@ import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprob
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { VideoModel } from '../../../models/video/video' import { VideoModel } from '../../../models/video/video'
import { VideoFileModel } from '../../../models/video/video-file' import { VideoFileModel } from '../../../models/video/video-file'
import { publishNewResolutionIfNeeded } from './video-transcoding' import { onNewWebTorrentFileResolution } from './video-transcoding'
async function processVideoFileImport (job: Bull.Job) { async function processVideoFileImport (job: Bull.Job) {
const payload = job.data as VideoFileImportPayload const payload = job.data as VideoFileImportPayload
@ -24,7 +24,7 @@ async function processVideoFileImport (job: Bull.Job) {
await updateVideoFile(video, payload.filePath) await updateVideoFile(video, payload.filePath)
await publishNewResolutionIfNeeded(video) await onNewWebTorrentFileResolution(video)
return video return video
} }

View File

@ -7,7 +7,7 @@ import { LiveManager } from '@server/lib/live-manager'
import { generateVideoMiniature } from '@server/lib/thumbnail' import { generateVideoMiniature } from '@server/lib/thumbnail'
import { publishAndFederateIfNeeded } from '@server/lib/video' import { publishAndFederateIfNeeded } from '@server/lib/video'
import { getHLSDirectory } from '@server/lib/video-paths' import { getHLSDirectory } from '@server/lib/video-paths'
import { generateHlsPlaylistFromTS } from '@server/lib/video-transcoding' import { generateHlsPlaylistResolutionFromTS } from '@server/lib/video-transcoding'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file' import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveModel } from '@server/models/video/video-live'
@ -102,7 +102,7 @@ async function saveLive (video: MVideo, live: MVideoLive) {
const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe) const { videoFileResolution, isPortraitMode } = await getVideoFileResolution(concatenatedTsFilePath, probe)
const outputPath = await generateHlsPlaylistFromTS({ const outputPath = await generateHlsPlaylistResolutionFromTS({
video: videoWithFiles, video: videoWithFiles,
concatenatedTsFilePath, concatenatedTsFilePath,
resolution: videoFileResolution, resolution: videoFileResolution,

View File

@ -1,8 +1,10 @@
import * as Bull from 'bull' import * as Bull from 'bull'
import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
import { publishAndFederateIfNeeded } from '@server/lib/video' import { publishAndFederateIfNeeded } from '@server/lib/video'
import { getVideoFilePath } from '@server/lib/video-paths' import { getVideoFilePath } from '@server/lib/video-paths'
import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models'
import { import {
HLSTranscodingPayload,
MergeAudioTranscodingPayload, MergeAudioTranscodingPayload,
NewResolutionTranscodingPayload, NewResolutionTranscodingPayload,
OptimizeTranscodingPayload, OptimizeTranscodingPayload,
@ -16,9 +18,31 @@ import { sequelizeTypescript } from '../../../initializers/database'
import { VideoModel } from '../../../models/video/video' import { VideoModel } from '../../../models/video/video'
import { federateVideoIfNeeded } from '../../activitypub/videos' import { federateVideoIfNeeded } from '../../activitypub/videos'
import { Notifier } from '../../notifier' import { Notifier } from '../../notifier'
import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' import {
generateHlsPlaylistResolution,
mergeAudioVideofile,
optimizeOriginalVideofile,
transcodeNewWebTorrentResolution
} from '../../video-transcoding'
import { JobQueue } from '../job-queue' import { JobQueue } from '../job-queue'
import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = {
// Deprecated, introduced in 3.1
'hls': handleHLSJob,
'new-resolution-to-hls': handleHLSJob,
// Deprecated, introduced in 3.1
'new-resolution': handleNewWebTorrentResolutionJob,
'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob,
// Deprecated, introduced in 3.1
'merge-audio': handleWebTorrentMergeAudioJob,
'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob,
// Deprecated, introduced in 3.1
'optimize': handleWebTorrentOptimizeJob,
'optimize-to-webtorrent': handleWebTorrentOptimizeJob
}
async function processVideoTranscoding (job: Bull.Job) { async function processVideoTranscoding (job: Bull.Job) {
const payload = job.data as VideoTranscodingPayload const payload = job.data as VideoTranscodingPayload
@ -31,42 +55,62 @@ async function processVideoTranscoding (job: Bull.Job) {
return undefined return undefined
} }
if (payload.type === 'hls') { const handler = handlers[payload.type]
const videoFileInput = payload.copyCodecs
? video.getWebTorrentFile(payload.resolution)
: video.getMaxQualityFile()
const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() if (!handler) {
const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput) throw new Error('Cannot find transcoding handler for ' + payload.type)
await generateHlsPlaylist({
video,
videoInputPath,
resolution: payload.resolution,
copyCodecs: payload.copyCodecs,
isPortraitMode: payload.isPortraitMode || false,
job
})
await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
} else if (payload.type === 'new-resolution') {
await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false, job)
await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
} else if (payload.type === 'merge-audio') {
await mergeAudioVideofile(video, payload.resolution, job)
await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
} else {
const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload, transcodeType)
} }
await handler(job, payload, video)
return video return video
} }
async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) { // ---------------------------------------------------------------------------
// Job handlers
// ---------------------------------------------------------------------------
async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) {
const videoFileInput = payload.copyCodecs
? video.getWebTorrentFile(payload.resolution)
: video.getMaxQualityFile()
const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput)
await generateHlsPlaylistResolution({
video,
videoInputPath,
resolution: payload.resolution,
copyCodecs: payload.copyCodecs,
isPortraitMode: payload.isPortraitMode || false,
job
})
await retryTransactionWrapper(onHlsPlaylistGeneration, video)
}
async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) {
await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload)
}
async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) {
await mergeAudioVideofile(video, payload.resolution, job)
await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload)
}
async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) {
const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType)
}
// ---------------------------------------------------------------------------
async function onHlsPlaylistGeneration (video: MVideoFullLight) {
if (video === undefined) return undefined if (video === undefined) return undefined
// We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
@ -82,13 +126,7 @@ async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) {
return publishAndFederateIfNeeded(video) return publishAndFederateIfNeeded(video)
} }
async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) { async function onVideoFileOptimizer (
await publishAndFederateIfNeeded(video)
createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true }))
}
async function onVideoFileOptimizerSuccess (
videoArg: MVideoWithFile, videoArg: MVideoWithFile,
payload: OptimizeTranscodingPayload, payload: OptimizeTranscodingPayload,
transcodeType: TranscodeOptionsType transcodeType: TranscodeOptionsType
@ -113,7 +151,7 @@ async function onVideoFileOptimizerSuccess (
let videoPublished = false let videoPublished = false
// Generate HLS version of the max quality file // Generate HLS version of the original file
const originalFileHLSPayload = Object.assign({}, payload, { const originalFileHLSPayload = Object.assign({}, payload, {
isPortraitMode, isPortraitMode,
resolution: videoDatabase.getMaxQualityFile().resolution, resolution: videoDatabase.getMaxQualityFile().resolution,
@ -122,36 +160,11 @@ async function onVideoFileOptimizerSuccess (
}) })
createHlsJobIfEnabled(originalFileHLSPayload) createHlsJobIfEnabled(originalFileHLSPayload)
if (resolutionsEnabled.length !== 0) { const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode)
for (const resolution of resolutionsEnabled) {
let dataInput: VideoTranscodingPayload
if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) { if (!hasNewResolutions) {
dataInput = {
type: 'new-resolution' as 'new-resolution',
videoUUID: videoDatabase.uuid,
resolution,
isPortraitMode
}
} else if (CONFIG.TRANSCODING.HLS.ENABLED) {
dataInput = {
type: 'hls',
videoUUID: videoDatabase.uuid,
resolution,
isPortraitMode,
copyCodecs: false
}
}
JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
}
logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
} else {
// No transcoding to do, it's now published // No transcoding to do, it's now published
videoPublished = await videoDatabase.publishIfNeededAndSave(t) videoPublished = await videoDatabase.publishIfNeededAndSave(t)
logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
} }
await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
@ -163,11 +176,20 @@ async function onVideoFileOptimizerSuccess (
if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase) if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
} }
async function onNewWebTorrentFileResolution (
video: MVideoUUID,
payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
) {
await publishAndFederateIfNeeded(video)
createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true }))
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export { export {
processVideoTranscoding, processVideoTranscoding,
publishNewResolutionIfNeeded onNewWebTorrentFileResolution
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -175,8 +197,8 @@ export {
function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) {
// Generate HLS playlist? // Generate HLS playlist?
if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
const hlsTranscodingPayload = { const hlsTranscodingPayload: HLSTranscodingPayload = {
type: 'hls' as 'hls', type: 'new-resolution-to-hls',
videoUUID: payload.videoUUID, videoUUID: payload.videoUUID,
resolution: payload.resolution, resolution: payload.resolution,
isPortraitMode: payload.isPortraitMode, isPortraitMode: payload.isPortraitMode,
@ -186,3 +208,46 @@ function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number
return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
} }
} }
function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) {
// Create transcoding jobs if there are enabled resolutions
const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
logger.info(
'Resolutions computed for video %s and origin file resolution of %d.', video.uuid, videoFileResolution,
{ resolutions: resolutionsEnabled }
)
if (resolutionsEnabled.length === 0) {
logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid)
return false
}
for (const resolution of resolutionsEnabled) {
let dataInput: VideoTranscodingPayload
if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
// WebTorrent will create subsequent HLS job
dataInput = {
type: 'new-resolution-to-webtorrent',
videoUUID: video.uuid,
resolution,
isPortraitMode
}
} else if (CONFIG.TRANSCODING.HLS.ENABLED) {
dataInput = {
type: 'new-resolution-to-hls',
videoUUID: video.uuid,
resolution,
isPortraitMode,
copyCodecs: false
}
}
JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
}
logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
return true
}

View File

@ -60,7 +60,7 @@ async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFile:
const videoOutputPath = getVideoFilePath(video, inputVideoFile) const videoOutputPath = getVideoFilePath(video, inputVideoFile)
await onVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) await onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath)
return transcodeType return transcodeType
} catch (err) { } catch (err) {
@ -72,7 +72,7 @@ async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFile:
} }
// Transcode the original video file to a lower resolution. // Transcode the original video file to a lower resolution.
async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoResolution, isPortrait: boolean, job: Job) { async function transcodeNewWebTorrentResolution (video: MVideoWithFile, resolution: VideoResolution, isPortrait: boolean, job: Job) {
const transcodeDirectory = CONFIG.STORAGE.TMP_DIR const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
const extname = '.mp4' const extname = '.mp4'
@ -118,7 +118,7 @@ async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoR
await transcode(transcodeOptions) await transcode(transcodeOptions)
return onVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, videoOutputPath) return onWebTorrentVideoFileTranscoding(video, newVideoFile, videoTranscodedPath, videoOutputPath)
} }
// Merge an image with an audio file to create a video // Merge an image with an audio file to create a video
@ -170,11 +170,11 @@ async function mergeAudioVideofile (video: MVideoWithAllFiles, resolution: Video
video.duration = await getDurationFromVideoFile(videoTranscodedPath) video.duration = await getDurationFromVideoFile(videoTranscodedPath)
await video.save() await video.save()
return onVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath) return onWebTorrentVideoFileTranscoding(video, inputVideoFile, videoTranscodedPath, videoOutputPath)
} }
// Concat TS segments from a live video to a fragmented mp4 HLS playlist // Concat TS segments from a live video to a fragmented mp4 HLS playlist
async function generateHlsPlaylistFromTS (options: { async function generateHlsPlaylistResolutionFromTS (options: {
video: MVideoWithFile video: MVideoWithFile
concatenatedTsFilePath: string concatenatedTsFilePath: string
resolution: VideoResolution resolution: VideoResolution
@ -192,7 +192,7 @@ async function generateHlsPlaylistFromTS (options: {
} }
// Generate an HLS playlist from an input file, and update the master playlist // Generate an HLS playlist from an input file, and update the master playlist
function generateHlsPlaylist (options: { function generateHlsPlaylistResolution (options: {
video: MVideoWithFile video: MVideoWithFile
videoInputPath: string videoInputPath: string
resolution: VideoResolution resolution: VideoResolution
@ -224,17 +224,22 @@ function getEnabledResolutions (type: 'vod' | 'live') {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export { export {
generateHlsPlaylist, generateHlsPlaylistResolution,
generateHlsPlaylistFromTS, generateHlsPlaylistResolutionFromTS,
optimizeOriginalVideofile, optimizeOriginalVideofile,
transcodeNewResolution, transcodeNewWebTorrentResolution,
mergeAudioVideofile, mergeAudioVideofile,
getEnabledResolutions getEnabledResolutions
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async function onVideoFileTranscoding (video: MVideoWithFile, videoFile: MVideoFile, transcodingPath: string, outputPath: string) { async function onWebTorrentVideoFileTranscoding (
video: MVideoWithFile,
videoFile: MVideoFile,
transcodingPath: string,
outputPath: string
) {
const stats = await stat(transcodingPath) const stats = await stat(transcodingPath)
const fps = await getVideoFileFPS(transcodingPath) const fps = await getVideoFileFPS(transcodingPath)
const metadata = await getMetadataFromFile(transcodingPath) const metadata = await getMetadataFromFile(transcodingPath)

View File

@ -1735,6 +1735,7 @@ export class VideoModel extends Model {
} }
getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) { getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) {
// We first transcode to WebTorrent format, so try this array first
if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) { if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) {
const file = fun(this.VideoFiles, file => file.resolution) const file = fun(this.VideoFiles, file => file.resolution)

View File

@ -100,26 +100,26 @@ interface BaseTranscodingPayload {
isNewVideo?: boolean isNewVideo?: boolean
} }
interface HLSTranscodingPayload extends BaseTranscodingPayload { export interface HLSTranscodingPayload extends BaseTranscodingPayload {
type: 'hls' type: 'new-resolution-to-hls'
isPortraitMode?: boolean isPortraitMode?: boolean
resolution: VideoResolution resolution: VideoResolution
copyCodecs: boolean copyCodecs: boolean
} }
export interface NewResolutionTranscodingPayload extends BaseTranscodingPayload { export interface NewResolutionTranscodingPayload extends BaseTranscodingPayload {
type: 'new-resolution' type: 'new-resolution-to-webtorrent'
isPortraitMode?: boolean isPortraitMode?: boolean
resolution: VideoResolution resolution: VideoResolution
} }
export interface MergeAudioTranscodingPayload extends BaseTranscodingPayload { export interface MergeAudioTranscodingPayload extends BaseTranscodingPayload {
type: 'merge-audio' type: 'merge-audio-to-webtorrent'
resolution: VideoResolution resolution: VideoResolution
} }
export interface OptimizeTranscodingPayload extends BaseTranscodingPayload { export interface OptimizeTranscodingPayload extends BaseTranscodingPayload {
type: 'optimize' type: 'optimize-to-webtorrent'
} }
export type VideoTranscodingPayload = export type VideoTranscodingPayload =