Compare commits

...

4 Commits

Author SHA1 Message Date
Chocobozzz
2af050cbf3
Viewers federation protocol v2
More efficient than the current one where instance is not fast enough to
send all viewers if a video becomes popular

The new protocol can be enabled by setting env
USE_VIEWERS_FEDERATION_V2='true'

Introduce a result field in View activity that contains the number of
viewers. This field is used by the origin instance to send the total
viewers on the video to remote instances. The difference with the
current protocol is that we don't have to send viewers individually to
remote instances.

There are 4 cases:
 * View activity from federation on Remote Video -> instance replaces
   all current viewers by a new viewer that contains the result counter
 * View activity from federation on Local Video -> instance adds the
   viewer without considering the result counter
 * Local view on Remote Video -> instance adds the viewer and send it to
   the origin instance
 * Local view on Local Video -> instance adds the viewer

Periodically PeerTube cleanups expired viewers. On local videos, the
instance sends to remote instances a View activity with the result
counter so they can update their viewers counter for that particular
video
2023-12-01 15:21:17 +01:00
Chocobozzz
f2065858b0
We don't need cookies for REST API 2023-11-30 11:08:04 +01:00
Chocobozzz
56d711a863
Optimize views endpoint
Lazy write data in redis
2023-11-30 10:50:47 +01:00
Chocobozzz
bbf585aad0
Optimize async middleware
Avoid using bluebird, a simple for/await is enought
2023-11-30 09:53:52 +01:00
19 changed files with 407 additions and 194 deletions

View File

@ -116,6 +116,11 @@ export interface ActivityView extends BaseActivity {
// If sending a "viewer" event // If sending a "viewer" event
expires?: string expires?: string
result?: {
type: 'InteractionCounter'
interactionType: 'WatchAction'
userInteractionCount: number
}
} }
export interface ActivityDislike extends BaseActivity { export interface ActivityDislike extends BaseActivity {

View File

@ -18,6 +18,7 @@ export interface VideoObject {
licence: ActivityIdentifierObject licence: ActivityIdentifierObject
language: ActivityIdentifierObject language: ActivityIdentifierObject
subtitleLanguage: ActivityIdentifierObject[] subtitleLanguage: ActivityIdentifierObject[]
views: number views: number
sensitive: boolean sensitive: boolean

View File

@ -56,3 +56,7 @@ export function isProdInstance () {
export function getAppNumber () { export function getAppNumber () {
return process.env.NODE_APP_INSTANCE || '' return process.env.NODE_APP_INSTANCE || ''
} }
export function isUsingViewersFederationV2 () {
return process.env.USE_VIEWERS_FEDERATION_V2 === 'true'
}

View File

@ -21,133 +21,171 @@ describe('Test video views/viewers counters', function () {
} }
} }
before(async function () { function runTests () {
this.timeout(120000) describe('Test views counter on VOD', function () {
let videoUUID: string
servers = await prepareViewsServers() before(async function () {
}) this.timeout(120000)
describe('Test views counter on VOD', function () { const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
let videoUUID: string videoUUID = uuid
await waitJobs(servers)
})
it('Should not view a video if watch time is below the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 0)
})
it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 1)
})
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 3)
})
})
describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand
before(async function () {
this.timeout(240000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)
for (let i = 0; i < 3; i++) {
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await wait(1000)
}
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})
it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(45000)
let error = false
do {
try {
await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)
error = false
await wait(2500)
} catch {
error = true
}
} while (error)
})
it('Should view on a remote and on local and display appropriate views/viewers', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await wait(3000) // Throttled federation
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 3)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 4)
})
after(async function () {
await stopFfmpeg(command)
})
})
}
describe('Federation V1', function () {
before(async function () { before(async function () {
this.timeout(120000) this.timeout(120000)
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' }) servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: false })
videoUUID = uuid
await waitJobs(servers)
}) })
it('Should not view a video if watch time is below the threshold', async function () { runTests()
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 0)
})
it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 1)
})
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 3)
})
})
describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand
before(async function () {
this.timeout(240000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})
it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(30000)
await wait(12000)
await waitJobs(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view on a remote and on local and display 2 viewers and 3 views', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 2)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 3)
})
after(async function () { after(async function () {
await stopFfmpeg(command) await cleanupTests(servers)
}) })
}) })
after(async function () { describe('Federation V2', function () {
await cleanupTests(servers)
before(async function () {
this.timeout(120000)
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: true })
})
runTests()
after(async function () {
await cleanupTests(servers)
})
}) })
}) })

View File

@ -3,6 +3,7 @@
import { expect } from 'chai' import { expect } from 'chai'
import { prepareViewsServers, prepareViewsVideos, processViewersStats } from '@tests/shared/views.js' import { prepareViewsServers, prepareViewsVideos, processViewersStats } from '@tests/shared/views.js'
import { cleanupTests, PeerTubeServer } from '@peertube/peertube-server-commands' import { cleanupTests, PeerTubeServer } from '@peertube/peertube-server-commands'
import { wait } from '@peertube/peertube-core-utils'
describe('Test views retention stats', function () { describe('Test views retention stats', function () {
let servers: PeerTubeServer[] let servers: PeerTubeServer[]
@ -45,6 +46,28 @@ describe('Test views retention stats', function () {
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 75, 25, 25, 25, 0 ]) expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 75, 25, 25, 25, 0 ])
}) })
it('Should display appropriate retention metrics after a server restart', async function () {
this.timeout(240000)
const newVideo = await servers[0].videos.quickUpload({ name: 'video 2' })
await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.2,127.0.0.1', id: newVideo.id, currentTimes: [ 0, 1 ] })
await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.3,127.0.0.1', id: newVideo.id, currentTimes: [ 1, 3 ] })
await wait(2500)
await servers[0].kill()
await servers[0].run()
await processViewersStats(servers)
const { data } = await servers[0].videoStats.getRetentionStats({ videoId: newVideo.id })
expect(data).to.have.lengthOf(6)
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 100, 50, 50, 0, 0 ])
})
}) })
after(async function () { after(async function () {

View File

@ -30,8 +30,17 @@ async function processViewsBuffer (servers: PeerTubeServer[]) {
await waitJobs(servers) await waitJobs(servers)
} }
async function prepareViewsServers () { async function prepareViewsServers (options: {
const servers = await createMultipleServers(2) viewersFederationV2?: boolean
viewExpiration?: string // default 1 second
} = {}) {
const { viewExpiration = '1 second' } = options
const env = options?.viewersFederationV2 === true
? { USE_VIEWERS_FEDERATION_V2: 'true' }
: undefined
const servers = await createMultipleServers(2, { views: { videos: { ip_view_expiration: viewExpiration } } }, { env })
await setAccessTokensToServers(servers) await setAccessTokensToServers(servers)
await setDefaultVideoChannel(servers) await setDefaultVideoChannel(servers)

View File

@ -196,11 +196,17 @@ const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string
uuid: 'sc:identifier' uuid: 'sc:identifier'
}), }),
View: buildContext({
WatchAction: 'sc:WatchAction',
InteractionCounter: 'sc:InteractionCounter',
interactionType: 'sc:interactionType',
userInteractionCount: 'sc:userInteractionCount'
}),
Collection: buildContext(), Collection: buildContext(),
Follow: buildContext(), Follow: buildContext(),
Reject: buildContext(), Reject: buildContext(),
Accept: buildContext(), Accept: buildContext(),
View: buildContext(),
Announce: buildContext(), Announce: buildContext(),
Comment: buildContext(), Comment: buildContext(),
Delete: buildContext(), Delete: buildContext(),

View File

@ -9,7 +9,6 @@ export function Debounce (config: { timeoutMS: number }) {
timeoutRef = setTimeout(() => { timeoutRef = setTimeout(() => {
original.apply(this, args) original.apply(this, args)
}, config.timeoutMS) }, config.timeoutMS)
} }
} }

View File

@ -480,6 +480,7 @@ const VIEW_LIFETIME = {
VIEWER_COUNTER: 60000 * 2, // 2 minutes VIEWER_COUNTER: 60000 * 2, // 2 minutes
VIEWER_STATS: 60000 * 60 // 1 hour VIEWER_STATS: 60000 * 60 // 1 hour
} }
let VIEWER_SYNC_REDIS = 30000 // Sync viewer into redis
const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100 const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100
@ -898,6 +899,9 @@ const LRU_CACHE = {
USER_TOKENS: { USER_TOKENS: {
MAX_SIZE: 1000 MAX_SIZE: 1000
}, },
LOCAL_VIDEO_VIEWERS_FEDERATION: {
MAX_SIZE: 1000
},
FILENAME_TO_PATH_PERMANENT_FILE_CACHE: { FILENAME_TO_PATH_PERMANENT_FILE_CACHE: {
MAX_SIZE: 1000 MAX_SIZE: 1000
}, },
@ -1102,6 +1106,8 @@ if (process.env.PRODUCTION_CONSTANTS !== 'true') {
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000 JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000
VIEWER_SYNC_REDIS = 1000
} }
if (isTestInstance()) { if (isTestInstance()) {
@ -1202,6 +1208,7 @@ export {
DEFAULT_THEME_NAME, DEFAULT_THEME_NAME,
NSFW_POLICY_TYPES, NSFW_POLICY_TYPES,
STATIC_MAX_AGE, STATIC_MAX_AGE,
VIEWER_SYNC_REDIS,
STATIC_PATHS, STATIC_PATHS,
VIDEO_IMPORT_TIMEOUT, VIDEO_IMPORT_TIMEOUT,
VIDEO_PLAYLIST_TYPES, VIDEO_PLAYLIST_TYPES,

View File

@ -28,11 +28,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
allowRefresh: false allowRefresh: false
}) })
const viewerExpires = activity.expires await VideoViewsManager.Instance.processRemoteView({
? new Date(activity.expires) video,
: undefined viewerId: activity.id,
await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires }) viewerExpires: activity.expires
? new Date(activity.expires)
: undefined,
viewerResultCounter: getViewerResultCounter(activity)
})
if (video.isOwned()) { if (video.isOwned()) {
// Forward the view but don't resend the activity to the sender // Forward the view but don't resend the activity to the sender
@ -40,3 +44,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
await forwardVideoRelatedActivity(activity, undefined, exceptions, video) await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
} }
} }
// Viewer protocol V2
function getViewerResultCounter (activity: ActivityView) {
const result = activity.result
if (!activity.expires || result?.interactionType !== 'WatchAction' || result?.type !== 'InteractionCounter') return undefined
const counter = parseInt(result.userInteractionCount + '')
if (isNaN(counter)) return undefined
return counter
}

View File

@ -6,24 +6,23 @@ import { logger } from '../../../helpers/logger.js'
import { audiencify, getAudience } from '../audience.js' import { audiencify, getAudience } from '../audience.js'
import { getLocalVideoViewActivityPubUrl } from '../url.js' import { getLocalVideoViewActivityPubUrl } from '../url.js'
import { sendVideoRelatedActivity } from './shared/send-utils.js' import { sendVideoRelatedActivity } from './shared/send-utils.js'
import { isUsingViewersFederationV2 } from '@peertube/peertube-node-utils'
type ViewType = 'view' | 'viewer'
async function sendView (options: { async function sendView (options: {
byActor: MActorLight byActor: MActorLight
type: ViewType
video: MVideoImmutable video: MVideoImmutable
viewerIdentifier: string viewerIdentifier: string
viewersCount?: number
transaction?: Transaction transaction?: Transaction
}) { }) {
const { byActor, type, video, viewerIdentifier, transaction } = options const { byActor, viewersCount, video, viewerIdentifier, transaction } = options
logger.info('Creating job to send %s of %s.', type, video.url) logger.info('Creating job to send %s of %s.', viewersCount !== undefined ? 'viewer' : 'view', video.url)
const activityBuilder = (audience: ActivityAudience) => { const activityBuilder = (audience: ActivityAudience) => {
const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier) const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier)
return buildViewActivity({ url, byActor, video, audience, type }) return buildViewActivity({ url, byActor, video, audience, viewersCount })
} }
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true }) return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
@ -41,22 +40,33 @@ function buildViewActivity (options: {
url: string url: string
byActor: MActorAudience byActor: MActorAudience
video: MVideoUrl video: MVideoUrl
type: ViewType viewersCount?: number
audience?: ActivityAudience audience?: ActivityAudience
}): ActivityView { }): ActivityView {
const { url, byActor, type, video, audience = getAudience(byActor) } = options const { url, byActor, viewersCount, video, audience = getAudience(byActor) } = options
return audiencify( const base = {
{ id: url,
id: url, type: 'View' as 'View',
type: 'View' as 'View', actor: byActor.url,
actor: byActor.url, object: video.url
object: video.url, }
expires: type === 'viewer' if (viewersCount === undefined) {
? new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString() return audiencify(base, audience)
: undefined }
},
audience return audiencify({
) ...base,
expires: new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString(),
result: isUsingViewersFederationV2()
? {
interactionType: 'WatchAction',
type: 'InteractionCounter',
userInteractionCount: viewersCount
}
: undefined
}, audience)
} }

View File

@ -4,7 +4,7 @@ import express from 'express'
import { readFile } from 'fs/promises' import { readFile } from 'fs/promises'
import { join } from 'path' import { join } from 'path'
import { logger } from '../../../helpers/logger.js' import { logger } from '../../../helpers/logger.js'
import { CUSTOM_HTML_TAG_COMMENTS, FILES_CONTENT_HASH, PLUGIN_GLOBAL_CSS_PATH, WEBSERVER } from '../../../initializers/constants.js' import { CUSTOM_HTML_TAG_COMMENTS, FILES_CONTENT_HASH, PLUGIN_GLOBAL_CSS_PATH } from '../../../initializers/constants.js'
import { ServerConfigManager } from '../../server-config-manager.js' import { ServerConfigManager } from '../../server-config-manager.js'
import { TagsHtml } from './tags-html.js' import { TagsHtml } from './tags-html.js'
import { pathExists } from 'fs-extra/esm' import { pathExists } from 'fs-extra/esm'
@ -94,7 +94,7 @@ export class PageHtml {
// Save locale in cookies // Save locale in cookies
res.cookie('clientLanguage', lang, { res.cookie('clientLanguage', lang, {
secure: WEBSERVER.SCHEME === 'https', secure: true,
sameSite: 'none', sameSite: 'none',
maxAge: 1000 * 3600 * 24 * 90 // 3 months maxAge: 1000 * 3600 * 24 * 90 // 3 months
}) })

View File

@ -1,4 +1,5 @@
import { buildUUID, isTestOrDevInstance, sha256 } from '@peertube/peertube-node-utils' import { buildUUID, isTestOrDevInstance, isUsingViewersFederationV2, sha256 } from '@peertube/peertube-node-utils'
import { exists } from '@server/helpers/custom-validators/misc.js'
import { logger, loggerTagsFactory } from '@server/helpers/logger.js' import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
import { VIEW_LIFETIME } from '@server/initializers/constants.js' import { VIEW_LIFETIME } from '@server/initializers/constants.js'
import { sendView } from '@server/lib/activitypub/send/send-view.js' import { sendView } from '@server/lib/activitypub/send/send-view.js'
@ -17,6 +18,7 @@ type Viewer = {
id: string id: string
viewerScope: ViewerScope viewerScope: ViewerScope
videoScope: VideoScope videoScope: VideoScope
viewerCount: number
lastFederation?: number lastFederation?: number
} }
@ -31,7 +33,7 @@ export class VideoViewerCounters {
private processingViewerCounters = false private processingViewerCounters = false
constructor () { constructor () {
setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER) setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER * 0.75)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -54,22 +56,48 @@ export class VideoViewerCounters {
return false return false
} }
const newViewer = await this.addViewerToVideo({ viewerId, video, viewerScope: 'local' }) const newViewer = this.addViewerToVideo({ viewerId, video, viewerScope: 'local', viewerCount: 1 })
await this.federateViewerIfNeeded(video, newViewer) await this.federateViewerIfNeeded(video, newViewer)
return true return true
} }
async addRemoteViewer (options: { addRemoteViewerOnLocalVideo (options: {
video: MVideo video: MVideo
viewerId: string viewerId: string
viewerExpires: Date viewerExpires: Date
}) { }) {
const { video, viewerExpires, viewerId } = options const { video, viewerExpires, viewerId } = options
logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) }) logger.debug('Adding remote viewer to local video %s.', video.uuid, { viewerId, viewerExpires, ...lTags(video.uuid) })
await this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote' }) this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote', viewerCount: 1 })
return true
}
addRemoteViewerOnRemoteVideo (options: {
video: MVideo
viewerId: string
viewerExpires: Date
viewerResultCounter?: number
}) {
const { video, viewerExpires, viewerId, viewerResultCounter } = options
logger.debug(
'Adding remote viewer to remote video %s.', video.uuid,
{ viewerId, viewerResultCounter, viewerExpires, ...lTags(video.uuid) }
)
this.addViewerToVideo({
video,
viewerExpires,
viewerId,
viewerScope: 'remote',
// The origin server sends a summary of all viewers, so we can replace our local copy
replaceCurrentViewers: exists(viewerResultCounter),
viewerCount: viewerResultCounter ?? 1
})
return true return true
} }
@ -83,17 +111,17 @@ export class VideoViewerCounters {
let total = 0 let total = 0
for (const viewers of this.viewersPerVideo.values()) { for (const viewers of this.viewersPerVideo.values()) {
total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope).length total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope)
.reduce((p, c) => p + c.viewerCount, 0)
} }
return total return total
} }
getViewers (video: MVideo) { getTotalViewersOf (video: MVideoImmutable) {
const viewers = this.viewersPerVideo.get(video.id) const viewers = this.viewersPerVideo.get(video.id)
if (!viewers) return 0
return viewers.length return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0
} }
buildViewerExpireTime () { buildViewerExpireTime () {
@ -102,17 +130,19 @@ export class VideoViewerCounters {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
private async addViewerToVideo (options: { private addViewerToVideo (options: {
video: MVideoImmutable video: MVideoImmutable
viewerId: string viewerId: string
viewerScope: ViewerScope viewerScope: ViewerScope
viewerCount: number
replaceCurrentViewers?: boolean
viewerExpires?: Date viewerExpires?: Date
}) { }) {
const { video, viewerExpires, viewerId, viewerScope } = options const { video, viewerExpires, viewerId, viewerScope, viewerCount, replaceCurrentViewers } = options
let watchers = this.viewersPerVideo.get(video.id) let watchers = this.viewersPerVideo.get(video.id)
if (!watchers) { if (!watchers || replaceCurrentViewers) {
watchers = [] watchers = []
this.viewersPerVideo.set(video.id, watchers) this.viewersPerVideo.set(video.id, watchers)
} }
@ -125,12 +155,12 @@ export class VideoViewerCounters {
? 'remote' ? 'remote'
: 'local' : 'local'
const viewer = { id: viewerId, expires, videoScope, viewerScope } const viewer = { id: viewerId, expires, videoScope, viewerScope, viewerCount }
watchers.push(viewer) watchers.push(viewer)
this.idToViewer.set(viewerId, viewer) this.idToViewer.set(viewerId, viewer)
await this.notifyClients(video.id, watchers.length) this.notifyClients(video)
return viewer return viewer
} }
@ -162,7 +192,16 @@ export class VideoViewerCounters {
if (newViewers.length === 0) this.viewersPerVideo.delete(videoId) if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
else this.viewersPerVideo.set(videoId, newViewers) else this.viewersPerVideo.set(videoId, newViewers)
await this.notifyClients(videoId, newViewers.length) const video = await VideoModel.loadImmutableAttributes(videoId)
if (video) {
this.notifyClients(video)
// Let total viewers expire on remote instances if there are no more viewers
if (video.remote === false && newViewers.length !== 0) {
await this.federateTotalViewers(video)
}
}
} }
} catch (err) { } catch (err) {
logger.error('Error in video clean viewers scheduler.', { err, ...lTags() }) logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
@ -171,13 +210,11 @@ export class VideoViewerCounters {
this.processingViewerCounters = false this.processingViewerCounters = false
} }
private async notifyClients (videoId: string | number, viewersLength: number) { private notifyClients (video: MVideoImmutable) {
const video = await VideoModel.loadImmutableAttributes(videoId) const totalViewers = this.getTotalViewersOf(video)
if (!video) return PeerTubeSocket.Instance.sendVideoViewsUpdate(video, totalViewers)
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength) logger.debug('Video viewers update for %s is %d.', video.url, totalViewers, lTags())
logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
} }
private generateViewerId (ip: string, videoUUID: string) { private generateViewerId (ip: string, videoUUID: string) {
@ -190,8 +227,26 @@ export class VideoViewerCounters {
const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75) const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75)
if (viewer.lastFederation && viewer.lastFederation > federationLimit) return if (viewer.lastFederation && viewer.lastFederation > federationLimit) return
if (video.remote === false && isUsingViewersFederationV2()) return
await sendView({
byActor: await getServerActor(),
video,
viewersCount: 1,
viewerIdentifier: viewer.id
})
await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id })
viewer.lastFederation = now viewer.lastFederation = now
} }
private async federateTotalViewers (video: MVideoImmutable) {
if (!isUsingViewersFederationV2()) return
await sendView({
byActor: await getServerActor(),
video,
viewersCount: this.getTotalViewersOf(video),
viewerIdentifier: video.uuid
})
}
} }

View File

@ -3,7 +3,7 @@ import { VideoViewEvent } from '@peertube/peertube-models'
import { isTestOrDevInstance } from '@peertube/peertube-node-utils' import { isTestOrDevInstance } from '@peertube/peertube-node-utils'
import { GeoIP } from '@server/helpers/geo-ip.js' import { GeoIP } from '@server/helpers/geo-ip.js'
import { logger, loggerTagsFactory } from '@server/helpers/logger.js' import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants.js' import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEWER_SYNC_REDIS, VIEW_LIFETIME } from '@server/initializers/constants.js'
import { sequelizeTypescript } from '@server/initializers/database.js' import { sequelizeTypescript } from '@server/initializers/database.js'
import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js' import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js'
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js' import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js'
@ -33,11 +33,14 @@ type LocalViewerStats = {
export class VideoViewerStats { export class VideoViewerStats {
private processingViewersStats = false private processingViewersStats = false
private processingRedisWrites = false
private readonly viewerCache = new Map<string, LocalViewerStats>() private readonly viewerCache = new Map<string, LocalViewerStats>()
private readonly redisPendingWrites = new Map<string, { ip: string, videoId: number, stats: LocalViewerStats }>()
constructor () { constructor () {
setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS) setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
setInterval(() => this.syncRedisWrites(), VIEWER_SYNC_REDIS)
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -123,7 +126,7 @@ export class VideoViewerStats {
logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) }) logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
await this.setLocalVideoViewer(ip, video.id, stats) this.setLocalVideoViewer(ip, video.id, stats)
} }
async processViewerStats () { async processViewerStats () {
@ -135,6 +138,8 @@ export class VideoViewerStats {
const now = new Date().getTime() const now = new Date().getTime()
try { try {
await this.syncRedisWrites()
const allKeys = await Redis.Instance.listLocalVideoViewerKeys() const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
for (const key of allKeys) { for (const key of allKeys) {
@ -222,7 +227,7 @@ export class VideoViewerStats {
const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(ip, videoId) const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(ip, videoId)
this.viewerCache.set(viewerKey, stats) this.viewerCache.set(viewerKey, stats)
return Redis.Instance.setLocalVideoViewer(ip, videoId, stats) this.redisPendingWrites.set(viewerKey, { ip, videoId, stats })
} }
private deleteLocalVideoViewersKeys (key: string) { private deleteLocalVideoViewersKeys (key: string) {
@ -230,4 +235,23 @@ export class VideoViewerStats {
return Redis.Instance.deleteLocalVideoViewersKeys(key) return Redis.Instance.deleteLocalVideoViewersKeys(key)
} }
private async syncRedisWrites () {
if (this.processingRedisWrites) return
this.processingRedisWrites = true
for (const [ key, pendingWrite ] of this.redisPendingWrites) {
const { ip, videoId, stats } = pendingWrite
this.redisPendingWrites.delete(key)
try {
await Redis.Instance.setLocalVideoViewer(ip, videoId, stats)
} catch (err) {
logger.error('Cannot write viewer into redis', { ip, videoId, stats, err })
}
}
this.processingRedisWrites = false
}
} }

View File

@ -35,7 +35,7 @@ export class VideoViews {
await this.addView(video) await this.addView(video)
await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() }) await sendView({ byActor: await getServerActor(), video, viewerIdentifier: buildUUID() })
return true return true
} }

View File

@ -66,17 +66,29 @@ export class VideoViewsManager {
video: MVideo video: MVideo
viewerId: string | null viewerId: string | null
viewerExpires?: Date viewerExpires?: Date
viewerResultCounter?: number
}) { }) {
const { video, viewerId, viewerExpires } = options const { video, viewerId, viewerExpires, viewerResultCounter } = options
logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() }) logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() })
if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires }) // Viewer
else await this.videoViews.addRemoteView({ video }) if (viewerExpires) {
if (video.remote === false) {
this.videoViewerCounters.addRemoteViewerOnLocalVideo({ video, viewerId, viewerExpires })
return
}
this.videoViewerCounters.addRemoteViewerOnRemoteVideo({ video, viewerId, viewerExpires, viewerResultCounter })
return
}
// Just a view
await this.videoViews.addRemoteView({ video })
} }
getViewers (video: MVideo) { getTotalViewersOf (video: MVideo) {
return this.videoViewerCounters.getViewers(video) return this.videoViewerCounters.getTotalViewersOf(video)
} }
getTotalViewers (options: { getTotalViewers (options: {

View File

@ -1,4 +1,3 @@
import Bluebird from 'bluebird'
import { NextFunction, Request, RequestHandler, Response } from 'express' import { NextFunction, Request, RequestHandler, Response } from 'express'
import { ValidationChain } from 'express-validator' import { ValidationChain } from 'express-validator'
import { ExpressPromiseHandler } from '@server/types/express-handler.js' import { ExpressPromiseHandler } from '@server/types/express-handler.js'
@ -9,22 +8,27 @@ import { retryTransactionWrapper } from '../helpers/database-utils.js'
export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler
function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) { function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) {
return (req: Request, res: Response, next: NextFunction) => { return async (req: Request, res: Response, next: NextFunction) => {
if (Array.isArray(fun) === true) { if (Array.isArray(fun) !== true) {
return Bluebird.each(fun as RequestPromiseHandler[], f => { return Promise.resolve((fun as RequestHandler)(req, res, next))
return new Promise<void>((resolve, reject) => { .catch(err => next(err))
}
try {
for (const f of (fun as RequestPromiseHandler[])) {
await new Promise<void>((resolve, reject) => {
return asyncMiddleware(f)(req, res, err => { return asyncMiddleware(f)(req, res, err => {
if (err) return reject(err) if (err) return reject(err)
return resolve() return resolve()
}) })
}) })
}).then(() => next()) }
.catch(err => next(err))
}
return Promise.resolve((fun as RequestHandler)(req, res, next)) next()
.catch(err => next(err)) } catch (err) {
next(err)
}
} }
} }

View File

@ -90,7 +90,7 @@ export function videoModelToFormattedJSON (video: MVideoFormattable, options: Vi
duration: video.duration, duration: video.duration,
views: video.views, views: video.views,
viewers: VideoViewsManager.Instance.getViewers(video), viewers: VideoViewsManager.Instance.getTotalViewersOf(video),
likes: video.likes, likes: video.likes,
dislikes: video.dislikes, dislikes: video.dislikes,

View File

@ -213,9 +213,6 @@ app.use(express.json({
} }
})) }))
// Cookies
app.use(cookieParser())
// W3C DNT Tracking Status // W3C DNT Tracking Status
app.use(advertiseDoNotTrack) app.use(advertiseDoNotTrack)
@ -230,9 +227,6 @@ app.use('/api/' + API_VERSION, apiRouter)
// Services (oembed...) // Services (oembed...)
app.use('/services', servicesRouter) app.use('/services', servicesRouter)
// Plugins & themes
app.use('/', pluginsRouter)
app.use('/', activityPubRouter) app.use('/', activityPubRouter)
app.use('/', feedsRouter) app.use('/', feedsRouter)
app.use('/', trackerRouter) app.use('/', trackerRouter)
@ -246,6 +240,12 @@ app.use('/', downloadRouter)
app.use('/', lazyStaticRouter) app.use('/', lazyStaticRouter)
app.use('/', objectStorageProxyRouter) app.use('/', objectStorageProxyRouter)
// Cookies for plugins and HTML
app.use(cookieParser())
// Plugins & themes
app.use('/', pluginsRouter)
// Client files, last valid routes! // Client files, last valid routes!
const cliOptions = cli.opts<{ client: boolean, plugins: boolean }>() const cliOptions = cli.opts<{ client: boolean, plugins: boolean }>()
if (cliOptions.client) app.use('/', clientsRouter) if (cliOptions.client) app.use('/', clientsRouter)