Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
2af050cbf3 | ||
![]() |
f2065858b0 | ||
![]() |
56d711a863 | ||
![]() |
bbf585aad0 |
|
@ -116,6 +116,11 @@ export interface ActivityView extends BaseActivity {
|
|||
|
||||
// If sending a "viewer" event
|
||||
expires?: string
|
||||
result?: {
|
||||
type: 'InteractionCounter'
|
||||
interactionType: 'WatchAction'
|
||||
userInteractionCount: number
|
||||
}
|
||||
}
|
||||
|
||||
export interface ActivityDislike extends BaseActivity {
|
||||
|
|
|
@ -18,6 +18,7 @@ export interface VideoObject {
|
|||
licence: ActivityIdentifierObject
|
||||
language: ActivityIdentifierObject
|
||||
subtitleLanguage: ActivityIdentifierObject[]
|
||||
|
||||
views: number
|
||||
|
||||
sensitive: boolean
|
||||
|
|
|
@ -56,3 +56,7 @@ export function isProdInstance () {
|
|||
export function getAppNumber () {
|
||||
return process.env.NODE_APP_INSTANCE || ''
|
||||
}
|
||||
|
||||
export function isUsingViewersFederationV2 () {
|
||||
return process.env.USE_VIEWERS_FEDERATION_V2 === 'true'
|
||||
}
|
||||
|
|
|
@ -21,133 +21,171 @@ describe('Test video views/viewers counters', function () {
|
|||
}
|
||||
}
|
||||
|
||||
before(async function () {
|
||||
this.timeout(120000)
|
||||
function runTests () {
|
||||
describe('Test views counter on VOD', function () {
|
||||
let videoUUID: string
|
||||
|
||||
servers = await prepareViewsServers()
|
||||
})
|
||||
before(async function () {
|
||||
this.timeout(120000)
|
||||
|
||||
describe('Test views counter on VOD', function () {
|
||||
let videoUUID: string
|
||||
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
|
||||
videoUUID = uuid
|
||||
|
||||
await waitJobs(servers)
|
||||
})
|
||||
|
||||
it('Should not view a video if watch time is below the threshold', async function () {
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 0)
|
||||
})
|
||||
|
||||
it('Should view a video if watch time is above the threshold', async function () {
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 1)
|
||||
})
|
||||
|
||||
it('Should not view again this video with the same IP', async function () {
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 2)
|
||||
})
|
||||
|
||||
it('Should view the video from server 2 and send the event', async function () {
|
||||
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
|
||||
await waitJobs(servers)
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 3)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Test views and viewers counters on live and VOD', function () {
|
||||
let liveVideoId: string
|
||||
let vodVideoId: string
|
||||
let command: FfmpegCommand
|
||||
|
||||
before(async function () {
|
||||
this.timeout(240000);
|
||||
|
||||
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
|
||||
})
|
||||
|
||||
it('Should display no views and viewers', async function () {
|
||||
await checkCounter('views', liveVideoId, 0)
|
||||
await checkCounter('viewers', liveVideoId, 0)
|
||||
|
||||
await checkCounter('views', vodVideoId, 0)
|
||||
await checkCounter('viewers', vodVideoId, 0)
|
||||
})
|
||||
|
||||
it('Should view twice and display 1 view/viewer', async function () {
|
||||
this.timeout(30000)
|
||||
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
|
||||
await wait(1000)
|
||||
}
|
||||
|
||||
await waitJobs(servers)
|
||||
|
||||
await checkCounter('viewers', liveVideoId, 1)
|
||||
await checkCounter('viewers', vodVideoId, 1)
|
||||
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', liveVideoId, 1)
|
||||
await checkCounter('views', vodVideoId, 1)
|
||||
})
|
||||
|
||||
it('Should wait and display 0 viewers but still have 1 view', async function () {
|
||||
this.timeout(45000)
|
||||
|
||||
let error = false
|
||||
|
||||
do {
|
||||
try {
|
||||
await checkCounter('views', liveVideoId, 1)
|
||||
await checkCounter('viewers', liveVideoId, 0)
|
||||
|
||||
await checkCounter('views', vodVideoId, 1)
|
||||
await checkCounter('viewers', vodVideoId, 0)
|
||||
|
||||
error = false
|
||||
await wait(2500)
|
||||
} catch {
|
||||
error = true
|
||||
}
|
||||
} while (error)
|
||||
})
|
||||
|
||||
it('Should view on a remote and on local and display appropriate views/viewers', async function () {
|
||||
this.timeout(30000)
|
||||
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
|
||||
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
|
||||
await wait(3000) // Throttled federation
|
||||
await waitJobs(servers)
|
||||
|
||||
await checkCounter('viewers', liveVideoId, 2)
|
||||
await checkCounter('viewers', vodVideoId, 3)
|
||||
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', liveVideoId, 3)
|
||||
await checkCounter('views', vodVideoId, 4)
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await stopFfmpeg(command)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
describe('Federation V1', function () {
|
||||
|
||||
before(async function () {
|
||||
this.timeout(120000)
|
||||
|
||||
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
|
||||
videoUUID = uuid
|
||||
|
||||
await waitJobs(servers)
|
||||
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: false })
|
||||
})
|
||||
|
||||
it('Should not view a video if watch time is below the threshold', async function () {
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 0)
|
||||
})
|
||||
|
||||
it('Should view a video if watch time is above the threshold', async function () {
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 1)
|
||||
})
|
||||
|
||||
it('Should not view again this video with the same IP', async function () {
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
|
||||
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 2)
|
||||
})
|
||||
|
||||
it('Should view the video from server 2 and send the event', async function () {
|
||||
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
|
||||
await waitJobs(servers)
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', videoUUID, 3)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Test views and viewers counters on live and VOD', function () {
|
||||
let liveVideoId: string
|
||||
let vodVideoId: string
|
||||
let command: FfmpegCommand
|
||||
|
||||
before(async function () {
|
||||
this.timeout(240000);
|
||||
|
||||
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
|
||||
})
|
||||
|
||||
it('Should display no views and viewers', async function () {
|
||||
await checkCounter('views', liveVideoId, 0)
|
||||
await checkCounter('viewers', liveVideoId, 0)
|
||||
|
||||
await checkCounter('views', vodVideoId, 0)
|
||||
await checkCounter('viewers', vodVideoId, 0)
|
||||
})
|
||||
|
||||
it('Should view twice and display 1 view/viewer', async function () {
|
||||
this.timeout(30000)
|
||||
|
||||
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
|
||||
await waitJobs(servers)
|
||||
await checkCounter('viewers', liveVideoId, 1)
|
||||
await checkCounter('viewers', vodVideoId, 1)
|
||||
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', liveVideoId, 1)
|
||||
await checkCounter('views', vodVideoId, 1)
|
||||
})
|
||||
|
||||
it('Should wait and display 0 viewers but still have 1 view', async function () {
|
||||
this.timeout(30000)
|
||||
|
||||
await wait(12000)
|
||||
await waitJobs(servers)
|
||||
|
||||
await checkCounter('views', liveVideoId, 1)
|
||||
await checkCounter('viewers', liveVideoId, 0)
|
||||
|
||||
await checkCounter('views', vodVideoId, 1)
|
||||
await checkCounter('viewers', vodVideoId, 0)
|
||||
})
|
||||
|
||||
it('Should view on a remote and on local and display 2 viewers and 3 views', async function () {
|
||||
this.timeout(30000)
|
||||
|
||||
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
|
||||
|
||||
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
|
||||
|
||||
await waitJobs(servers)
|
||||
|
||||
await checkCounter('viewers', liveVideoId, 2)
|
||||
await checkCounter('viewers', vodVideoId, 2)
|
||||
|
||||
await processViewsBuffer(servers)
|
||||
|
||||
await checkCounter('views', liveVideoId, 3)
|
||||
await checkCounter('views', vodVideoId, 3)
|
||||
})
|
||||
runTests()
|
||||
|
||||
after(async function () {
|
||||
await stopFfmpeg(command)
|
||||
await cleanupTests(servers)
|
||||
})
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
await cleanupTests(servers)
|
||||
describe('Federation V2', function () {
|
||||
|
||||
before(async function () {
|
||||
this.timeout(120000)
|
||||
|
||||
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: true })
|
||||
})
|
||||
|
||||
runTests()
|
||||
|
||||
after(async function () {
|
||||
await cleanupTests(servers)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
import { expect } from 'chai'
|
||||
import { prepareViewsServers, prepareViewsVideos, processViewersStats } from '@tests/shared/views.js'
|
||||
import { cleanupTests, PeerTubeServer } from '@peertube/peertube-server-commands'
|
||||
import { wait } from '@peertube/peertube-core-utils'
|
||||
|
||||
describe('Test views retention stats', function () {
|
||||
let servers: PeerTubeServer[]
|
||||
|
@ -45,6 +46,28 @@ describe('Test views retention stats', function () {
|
|||
|
||||
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 75, 25, 25, 25, 0 ])
|
||||
})
|
||||
|
||||
it('Should display appropriate retention metrics after a server restart', async function () {
|
||||
this.timeout(240000)
|
||||
|
||||
const newVideo = await servers[0].videos.quickUpload({ name: 'video 2' })
|
||||
|
||||
await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.2,127.0.0.1', id: newVideo.id, currentTimes: [ 0, 1 ] })
|
||||
await servers[0].views.simulateViewer({ xForwardedFor: '127.0.0.3,127.0.0.1', id: newVideo.id, currentTimes: [ 1, 3 ] })
|
||||
|
||||
await wait(2500)
|
||||
|
||||
await servers[0].kill()
|
||||
|
||||
await servers[0].run()
|
||||
|
||||
await processViewersStats(servers)
|
||||
|
||||
const { data } = await servers[0].videoStats.getRetentionStats({ videoId: newVideo.id })
|
||||
expect(data).to.have.lengthOf(6)
|
||||
|
||||
expect(data.map(d => d.retentionPercent)).to.deep.equal([ 50, 100, 50, 50, 0, 0 ])
|
||||
})
|
||||
})
|
||||
|
||||
after(async function () {
|
||||
|
|
|
@ -30,8 +30,17 @@ async function processViewsBuffer (servers: PeerTubeServer[]) {
|
|||
await waitJobs(servers)
|
||||
}
|
||||
|
||||
async function prepareViewsServers () {
|
||||
const servers = await createMultipleServers(2)
|
||||
async function prepareViewsServers (options: {
|
||||
viewersFederationV2?: boolean
|
||||
viewExpiration?: string // default 1 second
|
||||
} = {}) {
|
||||
const { viewExpiration = '1 second' } = options
|
||||
|
||||
const env = options?.viewersFederationV2 === true
|
||||
? { USE_VIEWERS_FEDERATION_V2: 'true' }
|
||||
: undefined
|
||||
|
||||
const servers = await createMultipleServers(2, { views: { videos: { ip_view_expiration: viewExpiration } } }, { env })
|
||||
await setAccessTokensToServers(servers)
|
||||
await setDefaultVideoChannel(servers)
|
||||
|
||||
|
|
|
@ -196,11 +196,17 @@ const contextStore: { [ id in ContextType ]: (string | { [ id: string ]: string
|
|||
uuid: 'sc:identifier'
|
||||
}),
|
||||
|
||||
View: buildContext({
|
||||
WatchAction: 'sc:WatchAction',
|
||||
InteractionCounter: 'sc:InteractionCounter',
|
||||
interactionType: 'sc:interactionType',
|
||||
userInteractionCount: 'sc:userInteractionCount'
|
||||
}),
|
||||
|
||||
Collection: buildContext(),
|
||||
Follow: buildContext(),
|
||||
Reject: buildContext(),
|
||||
Accept: buildContext(),
|
||||
View: buildContext(),
|
||||
Announce: buildContext(),
|
||||
Comment: buildContext(),
|
||||
Delete: buildContext(),
|
||||
|
|
|
@ -9,7 +9,6 @@ export function Debounce (config: { timeoutMS: number }) {
|
|||
|
||||
timeoutRef = setTimeout(() => {
|
||||
original.apply(this, args)
|
||||
|
||||
}, config.timeoutMS)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -480,6 +480,7 @@ const VIEW_LIFETIME = {
|
|||
VIEWER_COUNTER: 60000 * 2, // 2 minutes
|
||||
VIEWER_STATS: 60000 * 60 // 1 hour
|
||||
}
|
||||
let VIEWER_SYNC_REDIS = 30000 // Sync viewer into redis
|
||||
|
||||
const MAX_LOCAL_VIEWER_WATCH_SECTIONS = 100
|
||||
|
||||
|
@ -898,6 +899,9 @@ const LRU_CACHE = {
|
|||
USER_TOKENS: {
|
||||
MAX_SIZE: 1000
|
||||
},
|
||||
LOCAL_VIDEO_VIEWERS_FEDERATION: {
|
||||
MAX_SIZE: 1000
|
||||
},
|
||||
FILENAME_TO_PATH_PERMANENT_FILE_CACHE: {
|
||||
MAX_SIZE: 1000
|
||||
},
|
||||
|
@ -1102,6 +1106,8 @@ if (process.env.PRODUCTION_CONSTANTS !== 'true') {
|
|||
PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
|
||||
|
||||
JOB_REMOVAL_OPTIONS.SUCCESS['videos-views-stats'] = 10000
|
||||
|
||||
VIEWER_SYNC_REDIS = 1000
|
||||
}
|
||||
|
||||
if (isTestInstance()) {
|
||||
|
@ -1202,6 +1208,7 @@ export {
|
|||
DEFAULT_THEME_NAME,
|
||||
NSFW_POLICY_TYPES,
|
||||
STATIC_MAX_AGE,
|
||||
VIEWER_SYNC_REDIS,
|
||||
STATIC_PATHS,
|
||||
VIDEO_IMPORT_TIMEOUT,
|
||||
VIDEO_PLAYLIST_TYPES,
|
||||
|
|
|
@ -28,11 +28,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
|
|||
allowRefresh: false
|
||||
})
|
||||
|
||||
const viewerExpires = activity.expires
|
||||
? new Date(activity.expires)
|
||||
: undefined
|
||||
await VideoViewsManager.Instance.processRemoteView({
|
||||
video,
|
||||
viewerId: activity.id,
|
||||
|
||||
await VideoViewsManager.Instance.processRemoteView({ video, viewerId: activity.id, viewerExpires })
|
||||
viewerExpires: activity.expires
|
||||
? new Date(activity.expires)
|
||||
: undefined,
|
||||
viewerResultCounter: getViewerResultCounter(activity)
|
||||
})
|
||||
|
||||
if (video.isOwned()) {
|
||||
// Forward the view but don't resend the activity to the sender
|
||||
|
@ -40,3 +44,15 @@ async function processCreateView (activity: ActivityView, byActor: MActorSignatu
|
|||
await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
|
||||
}
|
||||
}
|
||||
|
||||
// Viewer protocol V2
|
||||
function getViewerResultCounter (activity: ActivityView) {
|
||||
const result = activity.result
|
||||
|
||||
if (!activity.expires || result?.interactionType !== 'WatchAction' || result?.type !== 'InteractionCounter') return undefined
|
||||
|
||||
const counter = parseInt(result.userInteractionCount + '')
|
||||
if (isNaN(counter)) return undefined
|
||||
|
||||
return counter
|
||||
}
|
||||
|
|
|
@ -6,24 +6,23 @@ import { logger } from '../../../helpers/logger.js'
|
|||
import { audiencify, getAudience } from '../audience.js'
|
||||
import { getLocalVideoViewActivityPubUrl } from '../url.js'
|
||||
import { sendVideoRelatedActivity } from './shared/send-utils.js'
|
||||
|
||||
type ViewType = 'view' | 'viewer'
|
||||
import { isUsingViewersFederationV2 } from '@peertube/peertube-node-utils'
|
||||
|
||||
async function sendView (options: {
|
||||
byActor: MActorLight
|
||||
type: ViewType
|
||||
video: MVideoImmutable
|
||||
viewerIdentifier: string
|
||||
viewersCount?: number
|
||||
transaction?: Transaction
|
||||
}) {
|
||||
const { byActor, type, video, viewerIdentifier, transaction } = options
|
||||
const { byActor, viewersCount, video, viewerIdentifier, transaction } = options
|
||||
|
||||
logger.info('Creating job to send %s of %s.', type, video.url)
|
||||
logger.info('Creating job to send %s of %s.', viewersCount !== undefined ? 'viewer' : 'view', video.url)
|
||||
|
||||
const activityBuilder = (audience: ActivityAudience) => {
|
||||
const url = getLocalVideoViewActivityPubUrl(byActor, video, viewerIdentifier)
|
||||
|
||||
return buildViewActivity({ url, byActor, video, audience, type })
|
||||
return buildViewActivity({ url, byActor, video, audience, viewersCount })
|
||||
}
|
||||
|
||||
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
|
||||
|
@ -41,22 +40,33 @@ function buildViewActivity (options: {
|
|||
url: string
|
||||
byActor: MActorAudience
|
||||
video: MVideoUrl
|
||||
type: ViewType
|
||||
viewersCount?: number
|
||||
audience?: ActivityAudience
|
||||
}): ActivityView {
|
||||
const { url, byActor, type, video, audience = getAudience(byActor) } = options
|
||||
const { url, byActor, viewersCount, video, audience = getAudience(byActor) } = options
|
||||
|
||||
return audiencify(
|
||||
{
|
||||
id: url,
|
||||
type: 'View' as 'View',
|
||||
actor: byActor.url,
|
||||
object: video.url,
|
||||
const base = {
|
||||
id: url,
|
||||
type: 'View' as 'View',
|
||||
actor: byActor.url,
|
||||
object: video.url
|
||||
}
|
||||
|
||||
expires: type === 'viewer'
|
||||
? new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString()
|
||||
: undefined
|
||||
},
|
||||
audience
|
||||
)
|
||||
if (viewersCount === undefined) {
|
||||
return audiencify(base, audience)
|
||||
}
|
||||
|
||||
return audiencify({
|
||||
...base,
|
||||
|
||||
expires: new Date(VideoViewsManager.Instance.buildViewerExpireTime()).toISOString(),
|
||||
|
||||
result: isUsingViewersFederationV2()
|
||||
? {
|
||||
interactionType: 'WatchAction',
|
||||
type: 'InteractionCounter',
|
||||
userInteractionCount: viewersCount
|
||||
}
|
||||
: undefined
|
||||
}, audience)
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import express from 'express'
|
|||
import { readFile } from 'fs/promises'
|
||||
import { join } from 'path'
|
||||
import { logger } from '../../../helpers/logger.js'
|
||||
import { CUSTOM_HTML_TAG_COMMENTS, FILES_CONTENT_HASH, PLUGIN_GLOBAL_CSS_PATH, WEBSERVER } from '../../../initializers/constants.js'
|
||||
import { CUSTOM_HTML_TAG_COMMENTS, FILES_CONTENT_HASH, PLUGIN_GLOBAL_CSS_PATH } from '../../../initializers/constants.js'
|
||||
import { ServerConfigManager } from '../../server-config-manager.js'
|
||||
import { TagsHtml } from './tags-html.js'
|
||||
import { pathExists } from 'fs-extra/esm'
|
||||
|
@ -94,7 +94,7 @@ export class PageHtml {
|
|||
|
||||
// Save locale in cookies
|
||||
res.cookie('clientLanguage', lang, {
|
||||
secure: WEBSERVER.SCHEME === 'https',
|
||||
secure: true,
|
||||
sameSite: 'none',
|
||||
maxAge: 1000 * 3600 * 24 * 90 // 3 months
|
||||
})
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import { buildUUID, isTestOrDevInstance, sha256 } from '@peertube/peertube-node-utils'
|
||||
import { buildUUID, isTestOrDevInstance, isUsingViewersFederationV2, sha256 } from '@peertube/peertube-node-utils'
|
||||
import { exists } from '@server/helpers/custom-validators/misc.js'
|
||||
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
|
||||
import { VIEW_LIFETIME } from '@server/initializers/constants.js'
|
||||
import { sendView } from '@server/lib/activitypub/send/send-view.js'
|
||||
|
@ -17,6 +18,7 @@ type Viewer = {
|
|||
id: string
|
||||
viewerScope: ViewerScope
|
||||
videoScope: VideoScope
|
||||
viewerCount: number
|
||||
lastFederation?: number
|
||||
}
|
||||
|
||||
|
@ -31,7 +33,7 @@ export class VideoViewerCounters {
|
|||
private processingViewerCounters = false
|
||||
|
||||
constructor () {
|
||||
setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER)
|
||||
setInterval(() => this.cleanViewerCounters(), VIEW_LIFETIME.VIEWER_COUNTER * 0.75)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
@ -54,22 +56,48 @@ export class VideoViewerCounters {
|
|||
return false
|
||||
}
|
||||
|
||||
const newViewer = await this.addViewerToVideo({ viewerId, video, viewerScope: 'local' })
|
||||
const newViewer = this.addViewerToVideo({ viewerId, video, viewerScope: 'local', viewerCount: 1 })
|
||||
await this.federateViewerIfNeeded(video, newViewer)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
async addRemoteViewer (options: {
|
||||
addRemoteViewerOnLocalVideo (options: {
|
||||
video: MVideo
|
||||
viewerId: string
|
||||
viewerExpires: Date
|
||||
}) {
|
||||
const { video, viewerExpires, viewerId } = options
|
||||
|
||||
logger.debug('Adding remote viewer to video %s.', video.uuid, { ...lTags(video.uuid) })
|
||||
logger.debug('Adding remote viewer to local video %s.', video.uuid, { viewerId, viewerExpires, ...lTags(video.uuid) })
|
||||
|
||||
await this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote' })
|
||||
this.addViewerToVideo({ video, viewerExpires, viewerId, viewerScope: 'remote', viewerCount: 1 })
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
addRemoteViewerOnRemoteVideo (options: {
|
||||
video: MVideo
|
||||
viewerId: string
|
||||
viewerExpires: Date
|
||||
viewerResultCounter?: number
|
||||
}) {
|
||||
const { video, viewerExpires, viewerId, viewerResultCounter } = options
|
||||
|
||||
logger.debug(
|
||||
'Adding remote viewer to remote video %s.', video.uuid,
|
||||
{ viewerId, viewerResultCounter, viewerExpires, ...lTags(video.uuid) }
|
||||
)
|
||||
|
||||
this.addViewerToVideo({
|
||||
video,
|
||||
viewerExpires,
|
||||
viewerId,
|
||||
viewerScope: 'remote',
|
||||
// The origin server sends a summary of all viewers, so we can replace our local copy
|
||||
replaceCurrentViewers: exists(viewerResultCounter),
|
||||
viewerCount: viewerResultCounter ?? 1
|
||||
})
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -83,17 +111,17 @@ export class VideoViewerCounters {
|
|||
let total = 0
|
||||
|
||||
for (const viewers of this.viewersPerVideo.values()) {
|
||||
total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope).length
|
||||
total += viewers.filter(v => v.viewerScope === options.viewerScope && v.videoScope === options.videoScope)
|
||||
.reduce((p, c) => p + c.viewerCount, 0)
|
||||
}
|
||||
|
||||
return total
|
||||
}
|
||||
|
||||
getViewers (video: MVideo) {
|
||||
getTotalViewersOf (video: MVideoImmutable) {
|
||||
const viewers = this.viewersPerVideo.get(video.id)
|
||||
if (!viewers) return 0
|
||||
|
||||
return viewers.length
|
||||
return viewers?.reduce((p, c) => p + c.viewerCount, 0) || 0
|
||||
}
|
||||
|
||||
buildViewerExpireTime () {
|
||||
|
@ -102,17 +130,19 @@ export class VideoViewerCounters {
|
|||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
private async addViewerToVideo (options: {
|
||||
private addViewerToVideo (options: {
|
||||
video: MVideoImmutable
|
||||
viewerId: string
|
||||
viewerScope: ViewerScope
|
||||
viewerCount: number
|
||||
replaceCurrentViewers?: boolean
|
||||
viewerExpires?: Date
|
||||
}) {
|
||||
const { video, viewerExpires, viewerId, viewerScope } = options
|
||||
const { video, viewerExpires, viewerId, viewerScope, viewerCount, replaceCurrentViewers } = options
|
||||
|
||||
let watchers = this.viewersPerVideo.get(video.id)
|
||||
|
||||
if (!watchers) {
|
||||
if (!watchers || replaceCurrentViewers) {
|
||||
watchers = []
|
||||
this.viewersPerVideo.set(video.id, watchers)
|
||||
}
|
||||
|
@ -125,12 +155,12 @@ export class VideoViewerCounters {
|
|||
? 'remote'
|
||||
: 'local'
|
||||
|
||||
const viewer = { id: viewerId, expires, videoScope, viewerScope }
|
||||
const viewer = { id: viewerId, expires, videoScope, viewerScope, viewerCount }
|
||||
watchers.push(viewer)
|
||||
|
||||
this.idToViewer.set(viewerId, viewer)
|
||||
|
||||
await this.notifyClients(video.id, watchers.length)
|
||||
this.notifyClients(video)
|
||||
|
||||
return viewer
|
||||
}
|
||||
|
@ -162,7 +192,16 @@ export class VideoViewerCounters {
|
|||
if (newViewers.length === 0) this.viewersPerVideo.delete(videoId)
|
||||
else this.viewersPerVideo.set(videoId, newViewers)
|
||||
|
||||
await this.notifyClients(videoId, newViewers.length)
|
||||
const video = await VideoModel.loadImmutableAttributes(videoId)
|
||||
|
||||
if (video) {
|
||||
this.notifyClients(video)
|
||||
|
||||
// Let total viewers expire on remote instances if there are no more viewers
|
||||
if (video.remote === false && newViewers.length !== 0) {
|
||||
await this.federateTotalViewers(video)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error in video clean viewers scheduler.', { err, ...lTags() })
|
||||
|
@ -171,13 +210,11 @@ export class VideoViewerCounters {
|
|||
this.processingViewerCounters = false
|
||||
}
|
||||
|
||||
private async notifyClients (videoId: string | number, viewersLength: number) {
|
||||
const video = await VideoModel.loadImmutableAttributes(videoId)
|
||||
if (!video) return
|
||||
private notifyClients (video: MVideoImmutable) {
|
||||
const totalViewers = this.getTotalViewersOf(video)
|
||||
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, totalViewers)
|
||||
|
||||
PeerTubeSocket.Instance.sendVideoViewsUpdate(video, viewersLength)
|
||||
|
||||
logger.debug('Video viewers update for %s is %d.', video.url, viewersLength, lTags())
|
||||
logger.debug('Video viewers update for %s is %d.', video.url, totalViewers, lTags())
|
||||
}
|
||||
|
||||
private generateViewerId (ip: string, videoUUID: string) {
|
||||
|
@ -190,8 +227,26 @@ export class VideoViewerCounters {
|
|||
const federationLimit = now - (VIEW_LIFETIME.VIEWER_COUNTER * 0.75)
|
||||
|
||||
if (viewer.lastFederation && viewer.lastFederation > federationLimit) return
|
||||
if (video.remote === false && isUsingViewersFederationV2()) return
|
||||
|
||||
await sendView({
|
||||
byActor: await getServerActor(),
|
||||
video,
|
||||
viewersCount: 1,
|
||||
viewerIdentifier: viewer.id
|
||||
})
|
||||
|
||||
await sendView({ byActor: await getServerActor(), video, type: 'viewer', viewerIdentifier: viewer.id })
|
||||
viewer.lastFederation = now
|
||||
}
|
||||
|
||||
private async federateTotalViewers (video: MVideoImmutable) {
|
||||
if (!isUsingViewersFederationV2()) return
|
||||
|
||||
await sendView({
|
||||
byActor: await getServerActor(),
|
||||
video,
|
||||
viewersCount: this.getTotalViewersOf(video),
|
||||
viewerIdentifier: video.uuid
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ import { VideoViewEvent } from '@peertube/peertube-models'
|
|||
import { isTestOrDevInstance } from '@peertube/peertube-node-utils'
|
||||
import { GeoIP } from '@server/helpers/geo-ip.js'
|
||||
import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
|
||||
import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEW_LIFETIME } from '@server/initializers/constants.js'
|
||||
import { MAX_LOCAL_VIEWER_WATCH_SECTIONS, VIEWER_SYNC_REDIS, VIEW_LIFETIME } from '@server/initializers/constants.js'
|
||||
import { sequelizeTypescript } from '@server/initializers/database.js'
|
||||
import { sendCreateWatchAction } from '@server/lib/activitypub/send/index.js'
|
||||
import { getLocalVideoViewerActivityPubUrl } from '@server/lib/activitypub/url.js'
|
||||
|
@ -33,11 +33,14 @@ type LocalViewerStats = {
|
|||
|
||||
export class VideoViewerStats {
|
||||
private processingViewersStats = false
|
||||
private processingRedisWrites = false
|
||||
|
||||
private readonly viewerCache = new Map<string, LocalViewerStats>()
|
||||
private readonly redisPendingWrites = new Map<string, { ip: string, videoId: number, stats: LocalViewerStats }>()
|
||||
|
||||
constructor () {
|
||||
setInterval(() => this.processViewerStats(), VIEW_LIFETIME.VIEWER_STATS)
|
||||
setInterval(() => this.syncRedisWrites(), VIEWER_SYNC_REDIS)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
@ -123,7 +126,7 @@ export class VideoViewerStats {
|
|||
|
||||
logger.debug('Set local video viewer stats for video %s.', video.uuid, { stats, ...lTags(video.uuid) })
|
||||
|
||||
await this.setLocalVideoViewer(ip, video.id, stats)
|
||||
this.setLocalVideoViewer(ip, video.id, stats)
|
||||
}
|
||||
|
||||
async processViewerStats () {
|
||||
|
@ -135,6 +138,8 @@ export class VideoViewerStats {
|
|||
const now = new Date().getTime()
|
||||
|
||||
try {
|
||||
await this.syncRedisWrites()
|
||||
|
||||
const allKeys = await Redis.Instance.listLocalVideoViewerKeys()
|
||||
|
||||
for (const key of allKeys) {
|
||||
|
@ -222,7 +227,7 @@ export class VideoViewerStats {
|
|||
const { viewerKey } = Redis.Instance.generateLocalVideoViewerKeys(ip, videoId)
|
||||
this.viewerCache.set(viewerKey, stats)
|
||||
|
||||
return Redis.Instance.setLocalVideoViewer(ip, videoId, stats)
|
||||
this.redisPendingWrites.set(viewerKey, { ip, videoId, stats })
|
||||
}
|
||||
|
||||
private deleteLocalVideoViewersKeys (key: string) {
|
||||
|
@ -230,4 +235,23 @@ export class VideoViewerStats {
|
|||
|
||||
return Redis.Instance.deleteLocalVideoViewersKeys(key)
|
||||
}
|
||||
|
||||
private async syncRedisWrites () {
|
||||
if (this.processingRedisWrites) return
|
||||
|
||||
this.processingRedisWrites = true
|
||||
|
||||
for (const [ key, pendingWrite ] of this.redisPendingWrites) {
|
||||
const { ip, videoId, stats } = pendingWrite
|
||||
this.redisPendingWrites.delete(key)
|
||||
|
||||
try {
|
||||
await Redis.Instance.setLocalVideoViewer(ip, videoId, stats)
|
||||
} catch (err) {
|
||||
logger.error('Cannot write viewer into redis', { ip, videoId, stats, err })
|
||||
}
|
||||
}
|
||||
|
||||
this.processingRedisWrites = false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ export class VideoViews {
|
|||
|
||||
await this.addView(video)
|
||||
|
||||
await sendView({ byActor: await getServerActor(), video, type: 'view', viewerIdentifier: buildUUID() })
|
||||
await sendView({ byActor: await getServerActor(), video, viewerIdentifier: buildUUID() })
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -66,17 +66,29 @@ export class VideoViewsManager {
|
|||
video: MVideo
|
||||
viewerId: string | null
|
||||
viewerExpires?: Date
|
||||
viewerResultCounter?: number
|
||||
}) {
|
||||
const { video, viewerId, viewerExpires } = options
|
||||
const { video, viewerId, viewerExpires, viewerResultCounter } = options
|
||||
|
||||
logger.debug('Processing remote view for %s.', video.url, { viewerExpires, viewerId, ...lTags() })
|
||||
|
||||
if (viewerExpires) await this.videoViewerCounters.addRemoteViewer({ video, viewerId, viewerExpires })
|
||||
else await this.videoViews.addRemoteView({ video })
|
||||
// Viewer
|
||||
if (viewerExpires) {
|
||||
if (video.remote === false) {
|
||||
this.videoViewerCounters.addRemoteViewerOnLocalVideo({ video, viewerId, viewerExpires })
|
||||
return
|
||||
}
|
||||
|
||||
this.videoViewerCounters.addRemoteViewerOnRemoteVideo({ video, viewerId, viewerExpires, viewerResultCounter })
|
||||
return
|
||||
}
|
||||
|
||||
// Just a view
|
||||
await this.videoViews.addRemoteView({ video })
|
||||
}
|
||||
|
||||
getViewers (video: MVideo) {
|
||||
return this.videoViewerCounters.getViewers(video)
|
||||
getTotalViewersOf (video: MVideo) {
|
||||
return this.videoViewerCounters.getTotalViewersOf(video)
|
||||
}
|
||||
|
||||
getTotalViewers (options: {
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
import Bluebird from 'bluebird'
|
||||
import { NextFunction, Request, RequestHandler, Response } from 'express'
|
||||
import { ValidationChain } from 'express-validator'
|
||||
import { ExpressPromiseHandler } from '@server/types/express-handler.js'
|
||||
|
@ -9,22 +8,27 @@ import { retryTransactionWrapper } from '../helpers/database-utils.js'
|
|||
export type RequestPromiseHandler = ValidationChain | ExpressPromiseHandler
|
||||
|
||||
function asyncMiddleware (fun: RequestPromiseHandler | RequestPromiseHandler[]) {
|
||||
return (req: Request, res: Response, next: NextFunction) => {
|
||||
if (Array.isArray(fun) === true) {
|
||||
return Bluebird.each(fun as RequestPromiseHandler[], f => {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
return async (req: Request, res: Response, next: NextFunction) => {
|
||||
if (Array.isArray(fun) !== true) {
|
||||
return Promise.resolve((fun as RequestHandler)(req, res, next))
|
||||
.catch(err => next(err))
|
||||
}
|
||||
|
||||
try {
|
||||
for (const f of (fun as RequestPromiseHandler[])) {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
return asyncMiddleware(f)(req, res, err => {
|
||||
if (err) return reject(err)
|
||||
|
||||
return resolve()
|
||||
})
|
||||
})
|
||||
}).then(() => next())
|
||||
.catch(err => next(err))
|
||||
}
|
||||
}
|
||||
|
||||
return Promise.resolve((fun as RequestHandler)(req, res, next))
|
||||
.catch(err => next(err))
|
||||
next()
|
||||
} catch (err) {
|
||||
next(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ export function videoModelToFormattedJSON (video: MVideoFormattable, options: Vi
|
|||
duration: video.duration,
|
||||
|
||||
views: video.views,
|
||||
viewers: VideoViewsManager.Instance.getViewers(video),
|
||||
viewers: VideoViewsManager.Instance.getTotalViewersOf(video),
|
||||
|
||||
likes: video.likes,
|
||||
dislikes: video.dislikes,
|
||||
|
|
|
@ -213,9 +213,6 @@ app.use(express.json({
|
|||
}
|
||||
}))
|
||||
|
||||
// Cookies
|
||||
app.use(cookieParser())
|
||||
|
||||
// W3C DNT Tracking Status
|
||||
app.use(advertiseDoNotTrack)
|
||||
|
||||
|
@ -230,9 +227,6 @@ app.use('/api/' + API_VERSION, apiRouter)
|
|||
// Services (oembed...)
|
||||
app.use('/services', servicesRouter)
|
||||
|
||||
// Plugins & themes
|
||||
app.use('/', pluginsRouter)
|
||||
|
||||
app.use('/', activityPubRouter)
|
||||
app.use('/', feedsRouter)
|
||||
app.use('/', trackerRouter)
|
||||
|
@ -246,6 +240,12 @@ app.use('/', downloadRouter)
|
|||
app.use('/', lazyStaticRouter)
|
||||
app.use('/', objectStorageProxyRouter)
|
||||
|
||||
// Cookies for plugins and HTML
|
||||
app.use(cookieParser())
|
||||
|
||||
// Plugins & themes
|
||||
app.use('/', pluginsRouter)
|
||||
|
||||
// Client files, last valid routes!
|
||||
const cliOptions = cli.opts<{ client: boolean, plugins: boolean }>()
|
||||
if (cliOptions.client) app.use('/', clientsRouter)
|
||||
|
|
Loading…
Reference in New Issue
Block a user