PeerTube_original/packages/tests/src/api/views/video-views-counter.ts
Chocobozzz 2af050cbf3
Viewers federation protocol v2
More efficient than the current one where instance is not fast enough to
send all viewers if a video becomes popular

The new protocol can be enabled by setting env
USE_VIEWERS_FEDERATION_V2='true'

Introduce a result field in View activity that contains the number of
viewers. This field is used by the origin instance to send the total
viewers on the video to remote instances. The difference with the
current protocol is that we don't have to send viewers individually to
remote instances.

There are 4 cases:
 * View activity from federation on Remote Video -> instance replaces
   all current viewers by a new viewer that contains the result counter
 * View activity from federation on Local Video -> instance adds the
   viewer without considering the result counter
 * Local view on Remote Video -> instance adds the viewer and send it to
   the origin instance
 * Local view on Local Video -> instance adds the viewer

Periodically PeerTube cleanups expired viewers. On local videos, the
instance sends to remote instances a View activity with the result
counter so they can update their viewers counter for that particular
video
2023-12-01 15:21:17 +01:00

192 lines
6.5 KiB
TypeScript

/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
import { expect } from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { prepareViewsServers, prepareViewsVideos, processViewsBuffer } from '@tests/shared/views.js'
import { wait } from '@peertube/peertube-core-utils'
import { cleanupTests, PeerTubeServer, stopFfmpeg, waitJobs } from '@peertube/peertube-server-commands'
describe('Test video views/viewers counters', function () {
let servers: PeerTubeServer[]
async function checkCounter (field: 'views' | 'viewers', id: string, expected: number) {
for (const server of servers) {
const video = await server.videos.get({ id })
const messageSuffix = video.isLive
? 'live video'
: 'vod video'
expect(video[field]).to.equal(expected, `${field} not valid on server ${server.serverNumber} for ${messageSuffix} ${video.uuid}`)
}
}
function runTests () {
describe('Test views counter on VOD', function () {
let videoUUID: string
before(async function () {
this.timeout(120000)
const { uuid } = await servers[0].videos.quickUpload({ name: 'video' })
videoUUID = uuid
await waitJobs(servers)
})
it('Should not view a video if watch time is below the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 2 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 0)
})
it('Should view a video if watch time is above the threshold', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 1)
})
it('Should not view again this video with the same IP', async function () {
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await servers[0].views.simulateViewer({ id: videoUUID, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 1, 4 ] })
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 2)
})
it('Should view the video from server 2 and send the event', async function () {
await servers[1].views.simulateViewer({ id: videoUUID, currentTimes: [ 1, 4 ] })
await waitJobs(servers)
await processViewsBuffer(servers)
await checkCounter('views', videoUUID, 3)
})
})
describe('Test views and viewers counters on live and VOD', function () {
let liveVideoId: string
let vodVideoId: string
let command: FfmpegCommand
before(async function () {
this.timeout(240000);
({ vodVideoId, liveVideoId, ffmpegCommand: command } = await prepareViewsVideos({ servers, live: true, vod: true }))
})
it('Should display no views and viewers', async function () {
await checkCounter('views', liveVideoId, 0)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 0)
await checkCounter('viewers', vodVideoId, 0)
})
it('Should view twice and display 1 view/viewer', async function () {
this.timeout(30000)
for (let i = 0; i < 3; i++) {
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await wait(1000)
}
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 1)
await checkCounter('viewers', vodVideoId, 1)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 1)
await checkCounter('views', vodVideoId, 1)
})
it('Should wait and display 0 viewers but still have 1 view', async function () {
this.timeout(45000)
let error = false
do {
try {
await checkCounter('views', liveVideoId, 1)
await checkCounter('viewers', liveVideoId, 0)
await checkCounter('views', vodVideoId, 1)
await checkCounter('viewers', vodVideoId, 0)
error = false
await wait(2500)
} catch {
error = true
}
} while (error)
})
it('Should view on a remote and on local and display appropriate views/viewers', async function () {
this.timeout(30000)
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, xForwardedFor: '0.0.0.1,127.0.0.1', currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[1].views.simulateViewer({ id: vodVideoId, currentTimes: [ 0, 5 ] })
await servers[0].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await servers[1].views.simulateViewer({ id: liveVideoId, currentTimes: [ 0, 35 ] })
await wait(3000) // Throttled federation
await waitJobs(servers)
await checkCounter('viewers', liveVideoId, 2)
await checkCounter('viewers', vodVideoId, 3)
await processViewsBuffer(servers)
await checkCounter('views', liveVideoId, 3)
await checkCounter('views', vodVideoId, 4)
})
after(async function () {
await stopFfmpeg(command)
})
})
}
describe('Federation V1', function () {
before(async function () {
this.timeout(120000)
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: false })
})
runTests()
after(async function () {
await cleanupTests(servers)
})
})
describe('Federation V2', function () {
before(async function () {
this.timeout(120000)
servers = await prepareViewsServers({ viewExpiration: '5 seconds', viewersFederationV2: true })
})
runTests()
after(async function () {
await cleanupTests(servers)
})
})
})