Send views in a dedicated queue
This commit is contained in:
parent
3396e65345
commit
f27b7a750f
|
@ -25,6 +25,7 @@ export class JobsComponent extends RestTable implements OnInit {
|
||||||
|
|
||||||
'activitypub-follow',
|
'activitypub-follow',
|
||||||
'activitypub-http-broadcast',
|
'activitypub-http-broadcast',
|
||||||
|
'activitypub-http-broadcast-parallel',
|
||||||
'activitypub-http-fetcher',
|
'activitypub-http-fetcher',
|
||||||
'activitypub-http-unicast',
|
'activitypub-http-unicast',
|
||||||
'activitypub-refresher',
|
'activitypub-refresher',
|
||||||
|
|
|
@ -139,6 +139,7 @@ const REMOTE_SCHEME = {
|
||||||
|
|
||||||
const JOB_ATTEMPTS: { [id in JobType]: number } = {
|
const JOB_ATTEMPTS: { [id in JobType]: number } = {
|
||||||
'activitypub-http-broadcast': 1,
|
'activitypub-http-broadcast': 1,
|
||||||
|
'activitypub-http-broadcast-parallel': 1,
|
||||||
'activitypub-http-unicast': 1,
|
'activitypub-http-unicast': 1,
|
||||||
'activitypub-http-fetcher': 2,
|
'activitypub-http-fetcher': 2,
|
||||||
'activitypub-follow': 5,
|
'activitypub-follow': 5,
|
||||||
|
@ -159,6 +160,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
|
||||||
// Excluded keys are jobs that can be configured by admins
|
// Excluded keys are jobs that can be configured by admins
|
||||||
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
|
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
|
||||||
'activitypub-http-broadcast': 1,
|
'activitypub-http-broadcast': 1,
|
||||||
|
'activitypub-http-broadcast-parallel': 30,
|
||||||
'activitypub-http-unicast': 10,
|
'activitypub-http-unicast': 10,
|
||||||
'activitypub-http-fetcher': 3,
|
'activitypub-http-fetcher': 3,
|
||||||
'activitypub-cleaner': 1,
|
'activitypub-cleaner': 1,
|
||||||
|
@ -176,6 +178,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
|
||||||
}
|
}
|
||||||
const JOB_TTL: { [id in JobType]: number } = {
|
const JOB_TTL: { [id in JobType]: number } = {
|
||||||
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
|
'activitypub-http-broadcast': 60000 * 10, // 10 minutes
|
||||||
|
'activitypub-http-broadcast-parallel': 60000 * 10, // 10 minutes
|
||||||
'activitypub-http-unicast': 60000 * 10, // 10 minutes
|
'activitypub-http-unicast': 60000 * 10, // 10 minutes
|
||||||
'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
|
'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
|
||||||
'activitypub-follow': 60000 * 10, // 10 minutes
|
'activitypub-follow': 60000 * 10, // 10 minutes
|
||||||
|
|
|
@ -26,7 +26,7 @@ async function sendView (options: {
|
||||||
return buildViewActivity({ url, byActor, video, audience, type })
|
return buildViewActivity({ url, byActor, video, audience, type })
|
||||||
}
|
}
|
||||||
|
|
||||||
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View' })
|
return sendVideoRelatedActivity(activityBuilder, { byActor, video, transaction, contextType: 'View', parallelizable: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
@ -15,9 +15,10 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
|
||||||
byActor: MActorLight
|
byActor: MActorLight
|
||||||
video: MVideoImmutable | MVideoAccountLight
|
video: MVideoImmutable | MVideoAccountLight
|
||||||
contextType: ContextType
|
contextType: ContextType
|
||||||
|
parallelizable?: boolean
|
||||||
transaction?: Transaction
|
transaction?: Transaction
|
||||||
}) {
|
}) {
|
||||||
const { byActor, video, transaction, contextType } = options
|
const { byActor, video, transaction, contextType, parallelizable } = options
|
||||||
|
|
||||||
// Send to origin
|
// Send to origin
|
||||||
if (video.isOwned() === false) {
|
if (video.isOwned() === false) {
|
||||||
|
@ -38,6 +39,7 @@ async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAud
|
||||||
toFollowersOf: actorsInvolvedInVideo,
|
toFollowersOf: actorsInvolvedInVideo,
|
||||||
transaction,
|
transaction,
|
||||||
actorsException,
|
actorsException,
|
||||||
|
parallelizable,
|
||||||
contextType
|
contextType
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -130,9 +132,10 @@ async function broadcastToFollowers (options: {
|
||||||
transaction: Transaction
|
transaction: Transaction
|
||||||
contextType: ContextType
|
contextType: ContextType
|
||||||
|
|
||||||
|
parallelizable?: boolean
|
||||||
actorsException?: MActorWithInboxes[]
|
actorsException?: MActorWithInboxes[]
|
||||||
}) {
|
}) {
|
||||||
const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options
|
const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [], parallelizable } = options
|
||||||
|
|
||||||
const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
|
const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
|
||||||
|
|
||||||
|
@ -141,6 +144,7 @@ async function broadcastToFollowers (options: {
|
||||||
uris,
|
uris,
|
||||||
data,
|
data,
|
||||||
byActor,
|
byActor,
|
||||||
|
parallelizable,
|
||||||
contextType
|
contextType
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -173,8 +177,9 @@ function broadcastTo (options: {
|
||||||
data: any
|
data: any
|
||||||
byActor: MActorId
|
byActor: MActorId
|
||||||
contextType: ContextType
|
contextType: ContextType
|
||||||
|
parallelizable?: boolean // default to false
|
||||||
}) {
|
}) {
|
||||||
const { uris, data, byActor, contextType } = options
|
const { uris, data, byActor, contextType, parallelizable } = options
|
||||||
|
|
||||||
if (uris.length === 0) return undefined
|
if (uris.length === 0) return undefined
|
||||||
|
|
||||||
|
@ -200,7 +205,13 @@ function broadcastTo (options: {
|
||||||
contextType
|
contextType
|
||||||
}
|
}
|
||||||
|
|
||||||
JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
|
JobQueue.Instance.createJob({
|
||||||
|
type: parallelizable
|
||||||
|
? 'activitypub-http-broadcast-parallel'
|
||||||
|
: 'activitypub-http-broadcast',
|
||||||
|
|
||||||
|
payload
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const unicastUri of unicastUris) {
|
for (const unicastUri of unicastUris) {
|
||||||
|
|
|
@ -43,6 +43,7 @@ import { processVideosViewsStats } from './handlers/video-views-stats'
|
||||||
|
|
||||||
type CreateJobArgument =
|
type CreateJobArgument =
|
||||||
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
|
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
|
||||||
|
{ type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
|
||||||
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
|
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
|
||||||
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
|
{ type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
|
||||||
{ type: 'activitypub-http-cleaner', payload: {} } |
|
{ type: 'activitypub-http-cleaner', payload: {} } |
|
||||||
|
@ -68,6 +69,7 @@ export type CreateJobOptions = {
|
||||||
|
|
||||||
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
|
const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
|
||||||
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
|
'activitypub-http-broadcast': processActivityPubHttpBroadcast,
|
||||||
|
'activitypub-http-broadcast-parallel': processActivityPubHttpBroadcast,
|
||||||
'activitypub-http-unicast': processActivityPubHttpUnicast,
|
'activitypub-http-unicast': processActivityPubHttpUnicast,
|
||||||
'activitypub-http-fetcher': processActivityPubHttpFetcher,
|
'activitypub-http-fetcher': processActivityPubHttpFetcher,
|
||||||
'activitypub-cleaner': processActivityPubCleaner,
|
'activitypub-cleaner': processActivityPubCleaner,
|
||||||
|
@ -89,6 +91,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
|
||||||
const jobTypes: JobType[] = [
|
const jobTypes: JobType[] = [
|
||||||
'activitypub-follow',
|
'activitypub-follow',
|
||||||
'activitypub-http-broadcast',
|
'activitypub-http-broadcast',
|
||||||
|
'activitypub-http-broadcast-parallel',
|
||||||
'activitypub-http-fetcher',
|
'activitypub-http-fetcher',
|
||||||
'activitypub-http-unicast',
|
'activitypub-http-unicast',
|
||||||
'activitypub-cleaner',
|
'activitypub-cleaner',
|
||||||
|
|
|
@ -9,6 +9,7 @@ export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
|
||||||
export type JobType =
|
export type JobType =
|
||||||
| 'activitypub-http-unicast'
|
| 'activitypub-http-unicast'
|
||||||
| 'activitypub-http-broadcast'
|
| 'activitypub-http-broadcast'
|
||||||
|
| 'activitypub-http-broadcast-parallel'
|
||||||
| 'activitypub-http-fetcher'
|
| 'activitypub-http-fetcher'
|
||||||
| 'activitypub-cleaner'
|
| 'activitypub-cleaner'
|
||||||
| 'activitypub-follow'
|
| 'activitypub-follow'
|
||||||
|
|
Loading…
Reference in New Issue
Block a user