Support live session in server

This commit is contained in:
Chocobozzz 2022-05-03 11:38:07 +02:00
parent 86c5229b4d
commit 26e3e98ff0
No known key found for this signature in database
GPG Key ID: 583A612D890159BE
29 changed files with 814 additions and 66 deletions

View File

@ -15,4 +15,5 @@ p-autocomplete {
.badge { .badge {
font-size: 13px; font-size: 13px;
margin-right: 5px;
} }

View File

@ -1,13 +1,21 @@
import express from 'express' import express from 'express'
import { exists } from '@server/helpers/custom-validators/misc' import { exists } from '@server/helpers/custom-validators/misc'
import { createReqFiles } from '@server/helpers/express-utils' import { createReqFiles } from '@server/helpers/express-utils'
import { getFormattedObjects } from '@server/helpers/utils'
import { ASSETS_PATH, MIMETYPES } from '@server/initializers/constants' import { ASSETS_PATH, MIMETYPES } from '@server/initializers/constants'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
import { Hooks } from '@server/lib/plugins/hooks' import { Hooks } from '@server/lib/plugins/hooks'
import { buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' import { buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
import { videoLiveAddValidator, videoLiveGetValidator, videoLiveUpdateValidator } from '@server/middlewares/validators/videos/video-live' import {
videoLiveAddValidator,
videoLiveFindReplaySessionValidator,
videoLiveGetValidator,
videoLiveListSessionsValidator,
videoLiveUpdateValidator
} from '@server/middlewares/validators/videos/video-live'
import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { MVideoDetails, MVideoFullLight } from '@server/types/models' import { MVideoDetails, MVideoFullLight } from '@server/types/models'
import { buildUUID, uuidToShort } from '@shared/extra-utils' import { buildUUID, uuidToShort } from '@shared/extra-utils'
import { HttpStatusCode, LiveVideoCreate, LiveVideoLatencyMode, LiveVideoUpdate, UserRight, VideoState } from '@shared/models' import { HttpStatusCode, LiveVideoCreate, LiveVideoLatencyMode, LiveVideoUpdate, UserRight, VideoState } from '@shared/models'
@ -28,6 +36,13 @@ liveRouter.post('/live',
asyncRetryTransactionMiddleware(addLiveVideo) asyncRetryTransactionMiddleware(addLiveVideo)
) )
liveRouter.get('/live/:videoId/sessions',
authenticate,
asyncMiddleware(videoLiveGetValidator),
videoLiveListSessionsValidator,
asyncMiddleware(getLiveVideoSessions)
)
liveRouter.get('/live/:videoId', liveRouter.get('/live/:videoId',
optionalAuthenticate, optionalAuthenticate,
asyncMiddleware(videoLiveGetValidator), asyncMiddleware(videoLiveGetValidator),
@ -41,6 +56,11 @@ liveRouter.put('/live/:videoId',
asyncRetryTransactionMiddleware(updateLiveVideo) asyncRetryTransactionMiddleware(updateLiveVideo)
) )
liveRouter.get('/:videoId/live-session',
asyncMiddleware(videoLiveFindReplaySessionValidator),
getLiveReplaySession
)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export { export {
@ -55,6 +75,20 @@ function getLiveVideo (req: express.Request, res: express.Response) {
return res.json(videoLive.toFormattedJSON(canSeePrivateLiveInformation(res))) return res.json(videoLive.toFormattedJSON(canSeePrivateLiveInformation(res)))
} }
function getLiveReplaySession (req: express.Request, res: express.Response) {
const session = res.locals.videoLiveSession
return res.json(session.toFormattedJSON())
}
async function getLiveVideoSessions (req: express.Request, res: express.Response) {
const videoLive = res.locals.videoLive
const data = await VideoLiveSessionModel.listSessionsOfLiveForAPI({ videoId: videoLive.videoId })
return res.json(getFormattedObjects(data, data.length))
}
function canSeePrivateLiveInformation (res: express.Response) { function canSeePrivateLiveInformation (res: express.Response) {
const user = res.locals.oauth?.token.User const user = res.locals.oauth?.token.User
if (!user) return false if (!user) return false

View File

@ -24,7 +24,7 @@ import { CONFIG, registerConfigChangedHandler } from './config'
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const LAST_MIGRATION_VERSION = 705 const LAST_MIGRATION_VERSION = 710
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@ -7,6 +7,7 @@ import { UserModel } from '@server/models/user/user'
import { UserNotificationModel } from '@server/models/user/user-notification' import { UserNotificationModel } from '@server/models/user/user-notification'
import { UserVideoHistoryModel } from '@server/models/user/user-video-history' import { UserVideoHistoryModel } from '@server/models/user/user-video-history'
import { VideoJobInfoModel } from '@server/models/video/video-job-info' import { VideoJobInfoModel } from '@server/models/video/video-job-info'
import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer' import { LocalVideoViewerModel } from '@server/models/view/local-video-viewer'
import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section' import { LocalVideoViewerWatchSectionModel } from '@server/models/view/local-video-viewer-watch-section'
import { isTestInstance } from '../helpers/core-utils' import { isTestInstance } from '../helpers/core-utils'
@ -135,6 +136,7 @@ async function initDatabaseModels (silent: boolean) {
VideoRedundancyModel, VideoRedundancyModel,
UserVideoHistoryModel, UserVideoHistoryModel,
VideoLiveModel, VideoLiveModel,
VideoLiveSessionModel,
AccountBlocklistModel, AccountBlocklistModel,
ServerBlocklistModel, ServerBlocklistModel,
UserNotificationModel, UserNotificationModel,

View File

@ -0,0 +1,34 @@
import * as Sequelize from 'sequelize'
async function up (utils: {
transaction: Sequelize.Transaction
queryInterface: Sequelize.QueryInterface
sequelize: Sequelize.Sequelize
db: any
}): Promise<void> {
const { transaction } = utils
const query = `
CREATE TABLE IF NOT EXISTS "videoLiveSession" (
"id" serial,
"startDate" timestamp with time zone NOT NULL,
"endDate" timestamp with time zone,
"error" integer,
"replayVideoId" integer REFERENCES "video" ("id") ON DELETE SET NULL ON UPDATE CASCADE,
"liveVideoId" integer REFERENCES "video" ("id") ON DELETE SET NULL ON UPDATE CASCADE,
"createdAt" timestamp with time zone NOT NULL,
"updatedAt" timestamp with time zone NOT NULL,
PRIMARY KEY ("id")
);
`
await utils.sequelize.query(query, { transaction })
}
function down () {
throw new Error('Not implemented.')
}
export {
up,
down
}

View File

@ -15,13 +15,14 @@ import { generateVideoMiniature } from '@server/lib/thumbnail'
import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
import { moveToNextState } from '@server/lib/video-state' import { moveToNextState } from '@server/lib/video-state'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
import { VideoFileModel } from '@server/models/video/video-file' import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MVideo, MVideoLive, MVideoWithAllFiles } from '@server/types/models' import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
import { logger } from '../../../helpers/logger' import { logger } from '../../../helpers/logger'
import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
async function processVideoLiveEnding (job: Job) { async function processVideoLiveEnding (job: Job) {
const payload = job.data as VideoLiveEndingPayload const payload = job.data as VideoLiveEndingPayload
@ -32,27 +33,28 @@ async function processVideoLiveEnding (job: Job) {
logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
} }
const video = await VideoModel.load(payload.videoId) const liveVideo = await VideoModel.load(payload.videoId)
const live = await VideoLiveModel.loadByVideoId(payload.videoId) const live = await VideoLiveModel.loadByVideoId(payload.videoId)
const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId)
if (!video || !live) { if (!liveVideo || !live || !liveSession) {
logError() logError()
return return
} }
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) LiveSegmentShaStore.Instance.cleanupShaSegments(liveVideo.uuid)
if (live.saveReplay !== true) { if (live.saveReplay !== true) {
return cleanupLiveAndFederate(video) return cleanupLiveAndFederate({ liveVideo })
} }
if (live.permanentLive) { if (live.permanentLive) {
await saveReplayToExternalVideo(video, payload.publishedAt, payload.replayDirectory) await saveReplayToExternalVideo({ liveVideo, liveSession, publishedAt: payload.publishedAt, replayDirectory: payload.replayDirectory })
return cleanupLiveAndFederate(video) return cleanupLiveAndFederate({ liveVideo })
} }
return replaceLiveByReplay(video, live, payload.replayDirectory) return replaceLiveByReplay({ liveVideo, live, liveSession, replayDirectory: payload.replayDirectory })
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -63,7 +65,14 @@ export {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string, replayDirectory: string) { async function saveReplayToExternalVideo (options: {
liveVideo: MVideo
liveSession: MVideoLiveSession
publishedAt: string
replayDirectory: string
}) {
const { liveVideo, liveSession, publishedAt, replayDirectory } = options
await cleanupTMPLiveFiles(getLiveDirectory(liveVideo)) await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
const video = new VideoModel({ const video = new VideoModel({
@ -78,7 +87,7 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string
language: liveVideo.language, language: liveVideo.language,
commentsEnabled: liveVideo.commentsEnabled, commentsEnabled: liveVideo.commentsEnabled,
downloadEnabled: liveVideo.downloadEnabled, downloadEnabled: liveVideo.downloadEnabled,
waitTranscoding: liveVideo.waitTranscoding, waitTranscoding: true,
nsfw: liveVideo.nsfw, nsfw: liveVideo.nsfw,
description: liveVideo.description, description: liveVideo.description,
support: liveVideo.support, support: liveVideo.support,
@ -94,6 +103,9 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string
await video.save() await video.save()
liveSession.replayVideoId = video.id
await liveSession.save()
// If live is blacklisted, also blacklist the replay // If live is blacklisted, also blacklist the replay
const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id) const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id)
if (blacklist) { if (blacklist) {
@ -105,7 +117,7 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string
}) })
} }
await assignReplaysToVideo(video, replayDirectory) await assignReplayFilesToVideo({ video, replayDirectory })
await remove(replayDirectory) await remove(replayDirectory)
@ -117,18 +129,29 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string
await moveToNextState({ video, isNewVideo: true }) await moveToNextState({ video, isNewVideo: true })
} }
async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirectory: string) { async function replaceLiveByReplay (options: {
await cleanupTMPLiveFiles(getLiveDirectory(video)) liveVideo: MVideo
liveSession: MVideoLiveSession
live: MVideoLive
replayDirectory: string
}) {
const { liveVideo, liveSession, live, replayDirectory } = options
await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
await live.destroy() await live.destroy()
video.isLive = false liveVideo.isLive = false
video.state = VideoState.TO_TRANSCODE liveVideo.waitTranscoding = true
liveVideo.state = VideoState.TO_TRANSCODE
await video.save() await liveVideo.save()
liveSession.replayVideoId = liveVideo.id
await liveSession.save()
// Remove old HLS playlist video files // Remove old HLS playlist video files
const videoWithFiles = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) const videoWithFiles = await VideoModel.loadAndPopulateAccountAndServerAndTags(liveVideo.id)
const hlsPlaylist = videoWithFiles.getHLSPlaylist() const hlsPlaylist = videoWithFiles.getHLSPlaylist()
await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
@ -139,7 +162,7 @@ async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirec
hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename() hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename()
await hlsPlaylist.save() await hlsPlaylist.save()
await assignReplaysToVideo(videoWithFiles, replayDirectory) await assignReplayFilesToVideo({ video: videoWithFiles, replayDirectory })
await remove(getLiveReplayBaseDirectory(videoWithFiles)) await remove(getLiveReplayBaseDirectory(videoWithFiles))
@ -150,7 +173,7 @@ async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirec
videoFile: videoWithFiles.getMaxQualityFile(), videoFile: videoWithFiles.getMaxQualityFile(),
type: ThumbnailType.MINIATURE type: ThumbnailType.MINIATURE
}) })
await video.addAndSaveThumbnail(miniature) await videoWithFiles.addAndSaveThumbnail(miniature)
} }
if (videoWithFiles.getPreview().automaticallyGenerated === true) { if (videoWithFiles.getPreview().automaticallyGenerated === true) {
@ -159,13 +182,19 @@ async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirec
videoFile: videoWithFiles.getMaxQualityFile(), videoFile: videoWithFiles.getMaxQualityFile(),
type: ThumbnailType.PREVIEW type: ThumbnailType.PREVIEW
}) })
await video.addAndSaveThumbnail(preview) await videoWithFiles.addAndSaveThumbnail(preview)
} }
await moveToNextState({ video: videoWithFiles, isNewVideo: false }) // We consider this is a new video
await moveToNextState({ video: videoWithFiles, isNewVideo: true })
} }
async function assignReplaysToVideo (video: MVideo, replayDirectory: string) { async function assignReplayFilesToVideo (options: {
video: MVideo
replayDirectory: string
}) {
const { video, replayDirectory } = options
let durationDone = false let durationDone = false
const concatenatedTsFiles = await readdir(replayDirectory) const concatenatedTsFiles = await readdir(replayDirectory)
@ -197,11 +226,15 @@ async function assignReplaysToVideo (video: MVideo, replayDirectory: string) {
return video return video
} }
async function cleanupLiveAndFederate (video: MVideo) { async function cleanupLiveAndFederate (options: {
const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) liveVideo: MVideo
await cleanupLive(video, streamingPlaylist) }) {
const { liveVideo } = options
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(liveVideo.id)
await cleanupLive(liveVideo, streamingPlaylist)
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(liveVideo.id)
return federateVideoIfNeeded(fullVideo, false, undefined) return federateVideoIfNeeded(fullVideo, false, undefined)
} }

View File

@ -17,10 +17,11 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/
import { UserModel } from '@server/models/user/user' import { UserModel } from '@server/models/user/user'
import { VideoModel } from '@server/models/video/video' import { VideoModel } from '@server/models/video/video'
import { VideoLiveModel } from '@server/models/video/video-live' import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
import { wait } from '@shared/core-utils' import { wait } from '@shared/core-utils'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models' import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from '../activitypub/videos' import { federateVideoIfNeeded } from '../activitypub/videos'
import { JobQueue } from '../job-queue' import { JobQueue } from '../job-queue'
import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
@ -174,10 +175,13 @@ class LiveManager {
return !!this.rtmpServer return !!this.rtmpServer
} }
stopSessionOf (videoId: number) { stopSessionOf (videoId: number, error: LiveVideoError | null) {
const sessionId = this.videoSessions.get(videoId) const sessionId = this.videoSessions.get(videoId)
if (!sessionId) return if (!sessionId) return
this.saveEndingSession(videoId, error)
.catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
this.videoSessions.delete(videoId) this.videoSessions.delete(videoId)
this.abortSession(sessionId) this.abortSession(sessionId)
} }
@ -274,6 +278,8 @@ class LiveManager {
const videoUUID = videoLive.Video.uuid const videoUUID = videoLive.Video.uuid
const localLTags = lTags(sessionId, videoUUID) const localLTags = lTags(sessionId, videoUUID)
const liveSession = await this.saveStartingSession(videoLive)
const user = await UserModel.loadByLiveId(videoLive.id) const user = await UserModel.loadByLiveId(videoLive.id)
LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
@ -299,24 +305,27 @@ class LiveManager {
localLTags localLTags
) )
this.stopSessionOf(videoId) this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH)
}) })
muxingSession.on('duration-exceeded', ({ videoId }) => { muxingSession.on('duration-exceeded', ({ videoId }) => {
logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
this.stopSessionOf(videoId) this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED)
}) })
muxingSession.on('quota-exceeded', ({ videoId }) => { muxingSession.on('quota-exceeded', ({ videoId }) => {
logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
this.stopSessionOf(videoId) this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED)
})
muxingSession.on('ffmpeg-error', ({ videoId }) => {
this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR)
}) })
muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId))
muxingSession.on('ffmpeg-end', ({ videoId }) => { muxingSession.on('ffmpeg-end', ({ videoId }) => {
this.onMuxingFFmpegEnd(videoId) this.onMuxingFFmpegEnd(videoId, sessionId)
}) })
muxingSession.on('after-cleanup', ({ videoId }) => { muxingSession.on('after-cleanup', ({ videoId }) => {
@ -324,7 +333,7 @@ class LiveManager {
muxingSession.destroy() muxingSession.destroy()
return this.onAfterMuxingCleanup({ videoId }) return this.onAfterMuxingCleanup({ videoId, liveSession })
.catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
}) })
@ -365,15 +374,19 @@ class LiveManager {
} }
} }
private onMuxingFFmpegEnd (videoId: number) { private onMuxingFFmpegEnd (videoId: number, sessionId: string) {
this.videoSessions.delete(videoId) this.videoSessions.delete(videoId)
this.saveEndingSession(videoId, null)
.catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
} }
private async onAfterMuxingCleanup (options: { private async onAfterMuxingCleanup (options: {
videoId: number | string videoId: number | string
liveSession?: MVideoLiveSession
cleanupNow?: boolean // Default false cleanupNow?: boolean // Default false
}) { }) {
const { videoId, cleanupNow = false } = options const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options
try { try {
const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
@ -381,13 +394,25 @@ class LiveManager {
const live = await VideoLiveModel.loadByVideoId(fullVideo.id) const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id)
// On server restart during a live
if (!liveSession.endDate) {
liveSession.endDate = new Date()
await liveSession.save()
}
JobQueue.Instance.createJob({ JobQueue.Instance.createJob({
type: 'video-live-ending', type: 'video-live-ending',
payload: { payload: {
videoId: fullVideo.id, videoId: fullVideo.id,
replayDirectory: live.saveReplay replayDirectory: live.saveReplay
? await this.findReplayDirectory(fullVideo) ? await this.findReplayDirectory(fullVideo)
: undefined, : undefined,
liveSessionId: liveSession.id,
publishedAt: fullVideo.publishedAt.toISOString() publishedAt: fullVideo.publishedAt.toISOString()
} }
}, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
@ -445,6 +470,23 @@ class LiveManager {
return playlist.save() return playlist.save()
} }
private saveStartingSession (videoLive: MVideoLiveVideo) {
const liveSession = new VideoLiveSessionModel({
startDate: new Date(),
liveVideoId: videoLive.videoId
})
return liveSession.save()
}
private async saveEndingSession (videoId: number, error: LiveVideoError | null) {
const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId)
liveSession.endDate = new Date()
liveSession.error = error
return liveSession.save()
}
static get Instance () { static get Instance () {
return this.instance || (this.instance = new this()) return this.instance || (this.instance = new this())
} }

View File

@ -28,7 +28,7 @@ interface MuxingSessionEvents {
'quota-exceeded': ({ videoId: number }) => void 'quota-exceeded': ({ videoId: number }) => void
'ffmpeg-end': ({ videoId: number }) => void 'ffmpeg-end': ({ videoId: number }) => void
'ffmpeg-error': ({ sessionId: string }) => void 'ffmpeg-error': ({ videoId: string }) => void
'after-cleanup': ({ videoId: number }) => void 'after-cleanup': ({ videoId: number }) => void
} }
@ -164,7 +164,11 @@ class MuxingSession extends EventEmitter {
this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand })
}) })
this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) this.ffmpegCommand.on('end', () => {
this.emit('ffmpeg-end', ({ videoId: this.videoId }))
this.onFFmpegEnded(this.outDirectory)
})
this.ffmpegCommand.run() this.ffmpegCommand.run()
} }
@ -197,7 +201,7 @@ class MuxingSession extends EventEmitter {
logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) this.emit('ffmpeg-error', ({ videoId: this.videoId }))
} }
private onFFmpegEnded (outPath: string) { private onFFmpegEnded (outPath: string) {

View File

@ -9,7 +9,7 @@ import {
MVideoFullLight, MVideoFullLight,
MVideoWithBlacklistLight MVideoWithBlacklistLight
} from '@server/types/models' } from '@server/types/models'
import { UserRight, VideoBlacklistCreate, VideoBlacklistType } from '../../shared/models' import { LiveVideoError, UserRight, VideoBlacklistCreate, VideoBlacklistType } from '../../shared/models'
import { UserAdminFlag } from '../../shared/models/users/user-flag.model' import { UserAdminFlag } from '../../shared/models/users/user-flag.model'
import { logger, loggerTagsFactory } from '../helpers/logger' import { logger, loggerTagsFactory } from '../helpers/logger'
import { CONFIG } from '../initializers/config' import { CONFIG } from '../initializers/config'
@ -81,7 +81,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video
} }
if (videoInstance.isLive) { if (videoInstance.isLive) {
LiveManager.Instance.stopSessionOf(videoInstance.id) LiveManager.Instance.stopSessionOf(videoInstance.id, LiveVideoError.BLACKLISTED)
} }
Notifier.Instance.notifyOnVideoBlacklist(blacklist) Notifier.Instance.notifyOnVideoBlacklist(blacklist)

View File

@ -126,12 +126,10 @@ async function moveToPublishedState (options: {
const { video, isNewVideo, transaction, previousVideoState } = options const { video, isNewVideo, transaction, previousVideoState } = options
const previousState = previousVideoState ?? video.state const previousState = previousVideoState ?? video.state
logger.info('Publishing video %s.', video.uuid, { previousState, tags: [ video.uuid ] }) logger.info('Publishing video %s.', video.uuid, { isNewVideo, previousState, tags: [ video.uuid ] })
await video.setNewState(VideoState.PUBLISHED, isNewVideo, transaction) await video.setNewState(VideoState.PUBLISHED, isNewVideo, transaction)
// If the video was not published, we consider it is a new one for other instances
// Live videos are always federated, so it's not a new video
await federateVideoIfNeeded(video, isNewVideo, transaction) await federateVideoIfNeeded(video, isNewVideo, transaction)
if (previousState === VideoState.TO_EDIT) { if (previousState === VideoState.TO_EDIT) {

View File

@ -28,6 +28,7 @@ import {
isValidVideoIdParam isValidVideoIdParam
} from '../shared' } from '../shared'
import { getCommonVideoEditAttributes } from './videos' import { getCommonVideoEditAttributes } from './videos'
import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
const videoLiveGetValidator = [ const videoLiveGetValidator = [
isValidVideoIdParam('videoId'), isValidVideoIdParam('videoId'),
@ -196,11 +197,48 @@ const videoLiveUpdateValidator = [
} }
] ]
const videoLiveListSessionsValidator = [
(req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking videoLiveListSessionsValidator parameters', { parameters: req.params })
// Check the user can manage the live
const user = res.locals.oauth.token.User
if (!checkUserCanManageVideo(user, res.locals.videoAll, UserRight.GET_ANY_LIVE, res)) return
return next()
}
]
const videoLiveFindReplaySessionValidator = [
isValidVideoIdParam('videoId'),
async (req: express.Request, res: express.Response, next: express.NextFunction) => {
logger.debug('Checking videoLiveFindReplaySessionValidator parameters', { parameters: req.params })
if (areValidationErrors(req, res)) return
if (!await doesVideoExist(req.params.videoId, res, 'id')) return
const session = await VideoLiveSessionModel.findSessionOfReplay(res.locals.videoId.id)
if (!session) {
return res.fail({
status: HttpStatusCode.NOT_FOUND_404,
message: 'No live replay found'
})
}
res.locals.videoLiveSession = session
return next()
}
]
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
export { export {
videoLiveAddValidator, videoLiveAddValidator,
videoLiveUpdateValidator, videoLiveUpdateValidator,
videoLiveListSessionsValidator,
videoLiveFindReplaySessionValidator,
videoLiveGetValidator videoLiveGetValidator
} }

View File

@ -0,0 +1,142 @@
import { FindOptions } from 'sequelize'
import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript'
import { MVideoLiveSession, MVideoLiveSessionReplay } from '@server/types/models'
import { uuidToShort } from '@shared/extra-utils'
import { LiveVideoError, LiveVideoSession } from '@shared/models'
import { AttributesOnly } from '@shared/typescript-utils'
import { VideoModel } from './video'
export enum ScopeNames {
WITH_REPLAY = 'WITH_REPLAY'
}
@Scopes(() => ({
[ScopeNames.WITH_REPLAY]: {
include: [
{
model: VideoModel.unscoped(),
as: 'ReplayVideo',
required: false
}
]
}
}))
@Table({
tableName: 'videoLiveSession',
indexes: [
{
fields: [ 'replayVideoId' ],
unique: true
},
{
fields: [ 'liveVideoId' ]
}
]
})
export class VideoLiveSessionModel extends Model<Partial<AttributesOnly<VideoLiveSessionModel>>> {
@CreatedAt
createdAt: Date
@UpdatedAt
updatedAt: Date
@AllowNull(false)
@Column(DataType.DATE)
startDate: Date
@AllowNull(true)
@Column(DataType.DATE)
endDate: Date
@AllowNull(true)
@Column
error: LiveVideoError
@ForeignKey(() => VideoModel)
@Column
replayVideoId: number
@BelongsTo(() => VideoModel, {
foreignKey: {
allowNull: true,
name: 'replayVideoId'
},
as: 'ReplayVideo',
onDelete: 'set null'
})
ReplayVideo: VideoModel
@ForeignKey(() => VideoModel)
@Column
liveVideoId: number
@BelongsTo(() => VideoModel, {
foreignKey: {
allowNull: true,
name: 'liveVideoId'
},
as: 'LiveVideo',
onDelete: 'set null'
})
LiveVideo: VideoModel
static load (id: number): Promise<MVideoLiveSession> {
return VideoLiveSessionModel.findOne({
where: { id }
})
}
static findSessionOfReplay (replayVideoId: number) {
const query = {
where: {
replayVideoId
}
}
return VideoLiveSessionModel.scope(ScopeNames.WITH_REPLAY).findOne(query)
}
static findCurrentSessionOf (videoId: number) {
return VideoLiveSessionModel.findOne({
where: {
liveVideoId: videoId,
endDate: null
},
order: [ [ 'startDate', 'DESC' ] ]
})
}
static listSessionsOfLiveForAPI (options: { videoId: number }) {
const { videoId } = options
const query: FindOptions<VideoLiveSessionModel> = {
where: {
liveVideoId: videoId
},
order: [ [ 'startDate', 'ASC' ] ]
}
return VideoLiveSessionModel.scope(ScopeNames.WITH_REPLAY).findAll(query)
}
toFormattedJSON (this: MVideoLiveSessionReplay): LiveVideoSession {
const replayVideo = this.ReplayVideo
? {
id: this.ReplayVideo.id,
uuid: this.ReplayVideo.uuid,
shortUUID: uuidToShort(this.ReplayVideo.uuid)
}
: undefined
return {
id: this.id,
startDate: this.startDate.toISOString(),
endDate: this.endDate
? this.endDate.toISOString()
: null,
replayVideo,
error: this.error
}
}
}

View File

@ -787,7 +787,7 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
logger.info('Stopping live of video %s after video deletion.', instance.uuid) logger.info('Stopping live of video %s after video deletion.', instance.uuid)
LiveManager.Instance.stopSessionOf(instance.id) LiveManager.Instance.stopSessionOf(instance.id, null)
} }
@BeforeDestroy @BeforeDestroy

View File

@ -388,6 +388,52 @@ describe('Test video lives API validator', function () {
}) })
}) })
describe('When getting live sessions', function () {
it('Should fail with a bad access token', async function () {
await command.listSessions({ token: 'toto', videoId: video.id, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 })
})
it('Should fail without token', async function () {
await command.listSessions({ token: null, videoId: video.id, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 })
})
it('Should fail with the token of another user', async function () {
await command.listSessions({ token: userAccessToken, videoId: video.id, expectedStatus: HttpStatusCode.FORBIDDEN_403 })
})
it('Should fail with a bad video id', async function () {
await command.listSessions({ videoId: 'toto', expectedStatus: HttpStatusCode.BAD_REQUEST_400 })
})
it('Should fail with an unknown video id', async function () {
await command.listSessions({ videoId: 454555, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})
it('Should fail with a non live video', async function () {
await command.listSessions({ videoId: videoIdNotLive, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})
it('Should succeed with the correct params', async function () {
await command.listSessions({ videoId: video.id })
})
})
describe('When getting live session of a replay', function () {
it('Should fail with a bad video id', async function () {
await command.getReplaySession({ videoId: 'toto', expectedStatus: HttpStatusCode.BAD_REQUEST_400 })
})
it('Should fail with an unknown video id', async function () {
await command.getReplaySession({ videoId: 454555, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})
it('Should fail with a non replay video', async function () {
await command.getReplaySession({ videoId: videoIdNotLive, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
})
})
describe('When updating live information', async function () { describe('When updating live information', async function () {
it('Should fail without access token', async function () { it('Should fail without access token', async function () {

View File

@ -3,7 +3,7 @@
import 'mocha' import 'mocha'
import * as chai from 'chai' import * as chai from 'chai'
import { wait } from '@shared/core-utils' import { wait } from '@shared/core-utils'
import { VideoPrivacy } from '@shared/models' import { LiveVideoError, VideoPrivacy } from '@shared/models'
import { import {
cleanupTests, cleanupTests,
ConfigCommand, ConfigCommand,
@ -12,7 +12,8 @@ import {
PeerTubeServer, PeerTubeServer,
setAccessTokensToServers, setAccessTokensToServers,
setDefaultVideoChannel, setDefaultVideoChannel,
waitJobs waitJobs,
waitUntilLiveWaitingOnAllServers
} from '@shared/server-commands' } from '@shared/server-commands'
import { checkLiveCleanup } from '../../shared' import { checkLiveCleanup } from '../../shared'
@ -24,12 +25,18 @@ describe('Test live constraints', function () {
let userAccessToken: string let userAccessToken: string
let userChannelId: number let userChannelId: number
async function createLiveWrapper (saveReplay: boolean) { async function createLiveWrapper (options: {
replay: boolean
permanent: boolean
}) {
const { replay, permanent } = options
const liveAttributes = { const liveAttributes = {
name: 'user live', name: 'user live',
channelId: userChannelId, channelId: userChannelId,
privacy: VideoPrivacy.PUBLIC, privacy: VideoPrivacy.PUBLIC,
saveReplay saveReplay: replay,
permanentLive: permanent
} }
const { uuid } = await servers[0].live.create({ token: userAccessToken, fields: liveAttributes }) const { uuid } = await servers[0].live.create({ token: userAccessToken, fields: liveAttributes })
@ -97,23 +104,42 @@ describe('Test live constraints', function () {
it('Should not have size limit if save replay is disabled', async function () { it('Should not have size limit if save replay is disabled', async function () {
this.timeout(60000) this.timeout(60000)
const userVideoLiveoId = await createLiveWrapper(false) const userVideoLiveoId = await createLiveWrapper({ replay: false, permanent: false })
await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: false }) await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: false })
}) })
it('Should have size limit depending on user global quota if save replay is enabled', async function () { it('Should have size limit depending on user global quota if save replay is enabled on non permanent live', async function () {
this.timeout(60000) this.timeout(60000)
// Wait for user quota memoize cache invalidation // Wait for user quota memoize cache invalidation
await wait(5000) await wait(5000)
const userVideoLiveoId = await createLiveWrapper(true) const userVideoLiveoId = await createLiveWrapper({ replay: true, permanent: false })
await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true }) await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true })
await waitUntilLivePublishedOnAllServers(userVideoLiveoId) await waitUntilLivePublishedOnAllServers(userVideoLiveoId)
await waitJobs(servers) await waitJobs(servers)
await checkSaveReplay(userVideoLiveoId) await checkSaveReplay(userVideoLiveoId)
const session = await servers[0].live.getReplaySession({ videoId: userVideoLiveoId })
expect(session.error).to.equal(LiveVideoError.QUOTA_EXCEEDED)
})
it('Should have size limit depending on user global quota if save replay is enabled on a permanent live', async function () {
this.timeout(60000)
// Wait for user quota memoize cache invalidation
await wait(5000)
const userVideoLiveoId = await createLiveWrapper({ replay: true, permanent: true })
await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true })
await waitJobs(servers)
await waitUntilLiveWaitingOnAllServers(servers, userVideoLiveoId)
const session = await servers[0].live.findLatestSession({ videoId: userVideoLiveoId })
expect(session.error).to.equal(LiveVideoError.QUOTA_EXCEEDED)
}) })
it('Should have size limit depending on user daily quota if save replay is enabled', async function () { it('Should have size limit depending on user daily quota if save replay is enabled', async function () {
@ -124,13 +150,16 @@ describe('Test live constraints', function () {
await updateQuota({ total: -1, daily: 1 }) await updateQuota({ total: -1, daily: 1 })
const userVideoLiveoId = await createLiveWrapper(true) const userVideoLiveoId = await createLiveWrapper({ replay: true, permanent: false })
await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true }) await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true })
await waitUntilLivePublishedOnAllServers(userVideoLiveoId) await waitUntilLivePublishedOnAllServers(userVideoLiveoId)
await waitJobs(servers) await waitJobs(servers)
await checkSaveReplay(userVideoLiveoId) await checkSaveReplay(userVideoLiveoId)
const session = await servers[0].live.getReplaySession({ videoId: userVideoLiveoId })
expect(session.error).to.equal(LiveVideoError.QUOTA_EXCEEDED)
}) })
it('Should succeed without quota limit', async function () { it('Should succeed without quota limit', async function () {
@ -141,7 +170,7 @@ describe('Test live constraints', function () {
await updateQuota({ total: 10 * 1000 * 1000, daily: -1 }) await updateQuota({ total: 10 * 1000 * 1000, daily: -1 })
const userVideoLiveoId = await createLiveWrapper(true) const userVideoLiveoId = await createLiveWrapper({ replay: true, permanent: false })
await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: false }) await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: false })
}) })
@ -162,13 +191,16 @@ describe('Test live constraints', function () {
} }
}) })
const userVideoLiveoId = await createLiveWrapper(true) const userVideoLiveoId = await createLiveWrapper({ replay: true, permanent: false })
await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true }) await servers[0].live.runAndTestStreamError({ token: userAccessToken, videoId: userVideoLiveoId, shouldHaveError: true })
await waitUntilLivePublishedOnAllServers(userVideoLiveoId) await waitUntilLivePublishedOnAllServers(userVideoLiveoId)
await waitJobs(servers) await waitJobs(servers)
await checkSaveReplay(userVideoLiveoId, [ 720, 480, 360, 240, 144 ]) await checkSaveReplay(userVideoLiveoId, [ 720, 480, 360, 240, 144 ])
const session = await servers[0].live.getReplaySession({ videoId: userVideoLiveoId })
expect(session.error).to.equal(LiveVideoError.DURATION_EXCEEDED)
}) })
after(async function () { after(async function () {

View File

@ -172,6 +172,23 @@ describe('Permanent live', function () {
await stopFfmpeg(ffmpegCommand) await stopFfmpeg(ffmpegCommand)
}) })
it('Should have appropriate sessions', async function () {
this.timeout(60000)
await servers[0].live.waitUntilWaiting({ videoId: videoUUID })
const { data, total } = await servers[0].live.listSessions({ videoId: videoUUID })
expect(total).to.equal(2)
expect(data).to.have.lengthOf(2)
for (const session of data) {
expect(session.startDate).to.exist
expect(session.endDate).to.exist
expect(session.error).to.not.exist
}
})
after(async function () { after(async function () {
await cleanupTests(servers) await cleanupTests(servers)
}) })

View File

@ -5,7 +5,7 @@ import * as chai from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg' import { FfmpegCommand } from 'fluent-ffmpeg'
import { checkLiveCleanup } from '@server/tests/shared' import { checkLiveCleanup } from '@server/tests/shared'
import { wait } from '@shared/core-utils' import { wait } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoPrivacy, VideoState } from '@shared/models' import { HttpStatusCode, LiveVideoCreate, LiveVideoError, VideoPrivacy, VideoState } from '@shared/models'
import { import {
cleanupTests, cleanupTests,
ConfigCommand, ConfigCommand,
@ -143,6 +143,9 @@ describe('Save replay setting', function () {
}) })
describe('With save replay disabled', function () { describe('With save replay disabled', function () {
let sessionStartDateMin: Date
let sessionStartDateMax: Date
let sessionEndDateMin: Date
it('Should correctly create and federate the "waiting for stream" live', async function () { it('Should correctly create and federate the "waiting for stream" live', async function () {
this.timeout(20000) this.timeout(20000)
@ -160,7 +163,9 @@ describe('Save replay setting', function () {
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
sessionStartDateMin = new Date()
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
sessionStartDateMax = new Date()
await waitJobs(servers) await waitJobs(servers)
@ -171,6 +176,7 @@ describe('Save replay setting', function () {
it('Should correctly delete the video files after the stream ended', async function () { it('Should correctly delete the video files after the stream ended', async function () {
this.timeout(40000) this.timeout(40000)
sessionEndDateMin = new Date()
await stopFfmpeg(ffmpegCommand) await stopFfmpeg(ffmpegCommand)
for (const server of servers) { for (const server of servers) {
@ -186,6 +192,24 @@ describe('Save replay setting', function () {
await checkLiveCleanup(servers[0], liveVideoUUID, []) await checkLiveCleanup(servers[0], liveVideoUUID, [])
}) })
it('Should have appropriate ended session', async function () {
const { data, total } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
expect(total).to.equal(1)
expect(data).to.have.lengthOf(1)
const session = data[0]
const startDate = new Date(session.startDate)
expect(startDate).to.be.above(sessionStartDateMin)
expect(startDate).to.be.below(sessionStartDateMax)
expect(session.endDate).to.exist
expect(new Date(session.endDate)).to.be.above(sessionEndDateMin)
expect(session.error).to.not.exist
expect(session.replayVideo).to.not.exist
})
it('Should correctly terminate the stream on blacklist and delete the live', async function () { it('Should correctly terminate the stream on blacklist and delete the live', async function () {
this.timeout(40000) this.timeout(40000)
@ -201,6 +225,15 @@ describe('Save replay setting', function () {
await checkLiveCleanup(servers[0], liveVideoUUID, []) await checkLiveCleanup(servers[0], liveVideoUUID, [])
}) })
it('Should have blacklisted session error', async function () {
const session = await servers[0].live.findLatestSession({ videoId: liveVideoUUID })
expect(session.startDate).to.exist
expect(session.endDate).to.exist
expect(session.error).to.equal(LiveVideoError.BLACKLISTED)
expect(session.replayVideo).to.not.exist
})
it('Should correctly terminate the stream on delete and delete the video', async function () { it('Should correctly terminate the stream on delete and delete the video', async function () {
this.timeout(40000) this.timeout(40000)
@ -249,6 +282,22 @@ describe('Save replay setting', function () {
await checkVideoState(liveVideoUUID, VideoState.PUBLISHED) await checkVideoState(liveVideoUUID, VideoState.PUBLISHED)
}) })
it('Should find the replay live session', async function () {
const session = await servers[0].live.getReplaySession({ videoId: liveVideoUUID })
expect(session).to.exist
expect(session.startDate).to.exist
expect(session.endDate).to.exist
expect(session.error).to.not.exist
expect(session.replayVideo).to.exist
expect(session.replayVideo.id).to.exist
expect(session.replayVideo.shortUUID).to.exist
expect(session.replayVideo.uuid).to.equal(liveVideoUUID)
})
it('Should update the saved live and correctly federate the updated attributes', async function () { it('Should update the saved live and correctly federate the updated attributes', async function () {
this.timeout(30000) this.timeout(30000)
@ -337,6 +386,27 @@ describe('Save replay setting', function () {
lastReplayUUID = video.uuid lastReplayUUID = video.uuid
}) })
it('Should have appropriate ended session and replay live session', async function () {
const { data, total } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
expect(total).to.equal(1)
expect(data).to.have.lengthOf(1)
const sessionFromLive = data[0]
const sessionFromReplay = await servers[0].live.getReplaySession({ videoId: lastReplayUUID })
for (const session of [ sessionFromLive, sessionFromReplay ]) {
expect(session.startDate).to.exist
expect(session.endDate).to.exist
expect(session.error).to.not.exist
expect(session.replayVideo).to.exist
expect(session.replayVideo.id).to.exist
expect(session.replayVideo.shortUUID).to.exist
expect(session.replayVideo.uuid).to.equal(lastReplayUUID)
}
})
it('Should have cleaned up the live files', async function () { it('Should have cleaned up the live files', async function () {
await checkLiveCleanup(servers[0], liveVideoUUID, []) await checkLiveCleanup(servers[0], liveVideoUUID, [])
}) })

View File

@ -594,6 +594,8 @@ describe('Test live', function () {
let permanentLiveReplayName: string let permanentLiveReplayName: string
let beforeServerRestart: Date
async function createLiveWrapper (options: { saveReplay: boolean, permanent: boolean }) { async function createLiveWrapper (options: { saveReplay: boolean, permanent: boolean }) {
const liveAttributes: LiveVideoCreate = { const liveAttributes: LiveVideoCreate = {
name: 'live video', name: 'live video',
@ -636,6 +638,8 @@ describe('Test live', function () {
} }
await killallServers([ servers[0] ]) await killallServers([ servers[0] ])
beforeServerRestart = new Date()
await servers[0].run() await servers[0].run()
await wait(5000) await wait(5000)
@ -653,6 +657,10 @@ describe('Test live', function () {
this.timeout(120000) this.timeout(120000)
await commands[0].waitUntilPublished({ videoId: liveVideoReplayId }) await commands[0].waitUntilPublished({ videoId: liveVideoReplayId })
const session = await commands[0].getReplaySession({ videoId: liveVideoReplayId })
expect(session.endDate).to.exist
expect(new Date(session.endDate)).to.be.above(beforeServerRestart)
}) })
it('Should have saved a permanent live replay', async function () { it('Should have saved a permanent live replay', async function () {

View File

@ -7,8 +7,8 @@ import {
checkMyVideoImportIsFinished, checkMyVideoImportIsFinished,
checkNewActorFollow, checkNewActorFollow,
checkNewVideoFromSubscription, checkNewVideoFromSubscription,
checkVideoStudioEditionIsFinished,
checkVideoIsPublished, checkVideoIsPublished,
checkVideoStudioEditionIsFinished,
FIXTURE_URLS, FIXTURE_URLS,
MockSmtpServer, MockSmtpServer,
prepareNotificationsTest, prepareNotificationsTest,
@ -16,8 +16,8 @@ import {
} from '@server/tests/shared' } from '@server/tests/shared'
import { wait } from '@shared/core-utils' import { wait } from '@shared/core-utils'
import { buildUUID } from '@shared/extra-utils' import { buildUUID } from '@shared/extra-utils'
import { UserNotification, UserNotificationType, VideoStudioTask, VideoPrivacy } from '@shared/models' import { UserNotification, UserNotificationType, VideoPrivacy, VideoStudioTask } from '@shared/models'
import { cleanupTests, PeerTubeServer, waitJobs } from '@shared/server-commands' import { cleanupTests, findExternalSavedVideo, PeerTubeServer, stopFfmpeg, waitJobs } from '@shared/server-commands'
const expect = chai.expect const expect = chai.expect
@ -323,6 +323,76 @@ describe('Test user notifications', function () {
}) })
}) })
describe('My live replay is published', function () {
let baseParams: CheckerBaseParams
before(() => {
baseParams = {
server: servers[1],
emails,
socketNotifications: adminNotificationsServer2,
token: servers[1].accessToken
}
})
it('Should send a notification is a live replay of a non permanent live is published', async function () {
this.timeout(120000)
const { shortUUID } = await servers[1].live.create({
fields: {
name: 'non permanent live',
privacy: VideoPrivacy.PUBLIC,
channelId: servers[1].store.channel.id,
saveReplay: true,
permanentLive: false
}
})
const ffmpegCommand = await servers[1].live.sendRTMPStreamInVideo({ videoId: shortUUID })
await waitJobs(servers)
await servers[1].live.waitUntilPublished({ videoId: shortUUID })
await stopFfmpeg(ffmpegCommand)
await servers[1].live.waitUntilReplacedByReplay({ videoId: shortUUID })
await waitJobs(servers)
await checkVideoIsPublished({ ...baseParams, videoName: 'non permanent live', shortUUID, checkType: 'presence' })
})
it('Should send a notification is a live replay of a permanent live is published', async function () {
this.timeout(120000)
const { shortUUID } = await servers[1].live.create({
fields: {
name: 'permanent live',
privacy: VideoPrivacy.PUBLIC,
channelId: servers[1].store.channel.id,
saveReplay: true,
permanentLive: true
}
})
const ffmpegCommand = await servers[1].live.sendRTMPStreamInVideo({ videoId: shortUUID })
await waitJobs(servers)
await servers[1].live.waitUntilPublished({ videoId: shortUUID })
const liveDetails = await servers[1].videos.get({ id: shortUUID })
await stopFfmpeg(ffmpegCommand)
await servers[1].live.waitUntilWaiting({ videoId: shortUUID })
await waitJobs(servers)
const video = await findExternalSavedVideo(servers[1], liveDetails)
expect(video).to.exist
await checkVideoIsPublished({ ...baseParams, videoName: video.name, shortUUID: video.shortUUID, checkType: 'presence' })
})
})
describe('Video studio', function () { describe('Video studio', function () {
let baseParams: CheckerBaseParams let baseParams: CheckerBaseParams

View File

@ -16,7 +16,8 @@ import {
PeerTubeServer, PeerTubeServer,
setAccessTokensToServers, setAccessTokensToServers,
setDefaultAccountAvatar, setDefaultAccountAvatar,
setDefaultChannelAvatar setDefaultChannelAvatar,
setDefaultVideoChannel
} from '@shared/server-commands' } from '@shared/server-commands'
import { MockSmtpServer } from './mock-servers' import { MockSmtpServer } from './mock-servers'
@ -682,10 +683,14 @@ async function prepareNotificationsTest (serversCount = 3, overrideConfigArg: an
const servers = await createMultipleServers(serversCount, Object.assign(overrideConfig, overrideConfigArg)) const servers = await createMultipleServers(serversCount, Object.assign(overrideConfig, overrideConfigArg))
await setAccessTokensToServers(servers) await setAccessTokensToServers(servers)
await setDefaultVideoChannel(servers)
await setDefaultChannelAvatar(servers) await setDefaultChannelAvatar(servers)
await setDefaultAccountAvatar(servers) await setDefaultAccountAvatar(servers)
if (servers[1]) await servers[1].config.enableStudio() if (servers[1]) {
await servers[1].config.enableStudio()
await servers[1].config.enableLive({ allowReplay: true, transcoding: false })
}
if (serversCount > 1) { if (serversCount > 1) {
await doubleFollow(servers[0], servers[1]) await doubleFollow(servers[0], servers[1])

View File

@ -119,6 +119,7 @@ declare module 'express' {
videoId?: MVideoId videoId?: MVideoId
videoLive?: MVideoLive videoLive?: MVideoLive
videoLiveSession?: MVideoLiveSession
videoShare?: MVideoShareActor videoShare?: MVideoShareActor

View File

@ -1,4 +1,5 @@
export * from './local-video-viewer-watch-section' export * from './local-video-viewer-watch-section'
export * from './local-video-viewer-watch-section'
export * from './local-video-viewer' export * from './local-video-viewer'
export * from './schedule-video-update' export * from './schedule-video-update'
export * from './tag' export * from './tag'
@ -11,6 +12,7 @@ export * from './video-channels'
export * from './video-comment' export * from './video-comment'
export * from './video-file' export * from './video-file'
export * from './video-import' export * from './video-import'
export * from './video-live-session'
export * from './video-live' export * from './video-live'
export * from './video-playlist' export * from './video-playlist'
export * from './video-playlist-element' export * from './video-playlist-element'

View File

@ -0,0 +1,15 @@
import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
import { PickWith } from '@shared/typescript-utils'
import { MVideo } from './video'
type Use<K extends keyof VideoLiveSessionModel, M> = PickWith<VideoLiveSessionModel, K, M>
// ############################################################################
export type MVideoLiveSession = Omit<VideoLiveSessionModel, 'Video' | 'VideoLive'>
// ############################################################################
export type MVideoLiveSessionReplay =
MVideoLiveSession &
Use<'ReplayVideo', MVideo>

View File

@ -160,6 +160,7 @@ export type VideoTranscodingPayload =
export interface VideoLiveEndingPayload { export interface VideoLiveEndingPayload {
videoId: number videoId: number
publishedAt: string publishedAt: string
liveSessionId: number
replayDirectory?: string replayDirectory?: string
} }

View File

@ -1,6 +1,8 @@
export * from './live-video-create.model' export * from './live-video-create.model'
export * from './live-video-error.enum'
export * from './live-video-event-payload.model' export * from './live-video-event-payload.model'
export * from './live-video-event.type' export * from './live-video-event.type'
export * from './live-video-latency-mode.enum' export * from './live-video-latency-mode.enum'
export * from './live-video-session.model'
export * from './live-video-update.model' export * from './live-video-update.model'
export * from './live-video.model' export * from './live-video.model'

View File

@ -0,0 +1,7 @@
export const enum LiveVideoError {
BAD_SOCKET_HEALTH = 1,
DURATION_EXCEEDED = 2,
QUOTA_EXCEEDED = 3,
FFMPEG_ERROR = 4,
BLACKLISTED = 5
}

View File

@ -0,0 +1,16 @@
import { LiveVideoError } from './live-video-error.enum'
export interface LiveVideoSession {
id: number
startDate: string
endDate: string
error: LiveVideoError
replayVideo: {
id: number
uuid: string
shortUUID: string
}
}

View File

@ -4,7 +4,17 @@ import { readdir } from 'fs-extra'
import { omit } from 'lodash' import { omit } from 'lodash'
import { join } from 'path' import { join } from 'path'
import { wait } from '@shared/core-utils' import { wait } from '@shared/core-utils'
import { HttpStatusCode, LiveVideo, LiveVideoCreate, LiveVideoUpdate, VideoCreateResult, VideoDetails, VideoState } from '@shared/models' import {
HttpStatusCode,
LiveVideo,
LiveVideoCreate,
LiveVideoSession,
LiveVideoUpdate,
ResultList,
VideoCreateResult,
VideoDetails,
VideoState
} from '@shared/models'
import { unwrapBody } from '../requests' import { unwrapBody } from '../requests'
import { AbstractCommand, OverrideCommandOptions } from '../shared' import { AbstractCommand, OverrideCommandOptions } from '../shared'
import { sendRTMPStream, testFfmpegStreamError } from './live' import { sendRTMPStream, testFfmpegStreamError } from './live'
@ -25,6 +35,42 @@ export class LiveCommand extends AbstractCommand {
}) })
} }
listSessions (options: OverrideCommandOptions & {
videoId: number | string
}) {
const path = `/api/v1/videos/live/${options.videoId}/sessions`
return this.getRequestBody<ResultList<LiveVideoSession>>({
...options,
path,
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
async findLatestSession (options: OverrideCommandOptions & {
videoId: number | string
}) {
const { data: sessions } = await this.listSessions(options)
return sessions[sessions.length - 1]
}
getReplaySession (options: OverrideCommandOptions & {
videoId: number | string
}) {
const path = `/api/v1/videos/${options.videoId}/live-session`
return this.getRequestBody<LiveVideoSession>({
...options,
path,
implicitToken: true,
defaultExpectedStatus: HttpStatusCode.OK_200
})
}
update (options: OverrideCommandOptions & { update (options: OverrideCommandOptions & {
videoId: number | string videoId: number | string
fields: LiveVideoUpdate fields: LiveVideoUpdate

View File

@ -2462,6 +2462,48 @@ paths:
description: bad parameters or trying to update a live that has already started description: bad parameters or trying to update a live that has already started
'403': '403':
description: trying to save replay of the live but saving replay is not enabled on the instance description: trying to save replay of the live but saving replay is not enabled on the instance
/videos/live/{id}/sessions:
get:
summary: List live sessions
description: List all sessions created in a particular live
security:
- OAuth2: []
tags:
- Live Videos
parameters:
- $ref: '#/components/parameters/idOrUUID'
responses:
'200':
description: successful operation
content:
application/json:
schema:
type: object
properties:
total:
type: integer
example: 1
data:
type: array
items:
$ref: '#/components/schemas/LiveVideoSessionResponse'
/videos/{id}/live-session:
get:
summary: Get live session of a replay
description: If the video is a replay of a live, you can find the associated live session using this endpoint
security:
- OAuth2: []
tags:
- Live Videos
parameters:
- $ref: '#/components/parameters/idOrUUID'
responses:
'200':
description: successful operation
content:
application/json:
schema:
$ref: '#/components/schemas/LiveVideoSessionResponse'
/users/me/abuses: /users/me/abuses:
get: get:
@ -7673,6 +7715,46 @@ components:
description: User can select live latency mode if enabled by the instance description: User can select live latency mode if enabled by the instance
$ref: '#/components/schemas/LiveVideoLatencyMode' $ref: '#/components/schemas/LiveVideoLatencyMode'
LiveVideoSessionResponse:
properties:
id:
type: integer
startDate:
type: string
format: date-time
description: Start date of the live session
endDate:
type: string
format: date-time
nullable: true
description: End date of the live session
error:
type: integer
enum:
- 1
- 2
- 3
- 4
- 5
nullable: true
description: >
Error type if an error occured during the live session:
- `1`: Bad socket health (transcoding is too slow)
- `2`: Max duration exceeded
- `3`: Quota exceeded
- `4`: Quota FFmpeg error
- `5`: Video has been blacklisted during the live
replayVideo:
type: object
description: Video replay information
properties:
id:
type: number
uuid:
$ref: '#/components/schemas/UUIDv4'
shortUUID:
$ref: '#/components/schemas/shortUUID'
callbacks: callbacks:
searchIndex: searchIndex:
'https://search.example.org/api/v1/search/videos': 'https://search.example.org/api/v1/search/videos':