Optimize activity actor load in AP processors
This commit is contained in:
parent
d4defe07d2
commit
e587e0ecee
|
@ -89,7 +89,7 @@ async function searchVideoChannelURI (search: string, isWebfingerSearch: boolean
|
||||||
|
|
||||||
if (isUserAbleToSearchRemoteURI(res)) {
|
if (isUserAbleToSearchRemoteURI(res)) {
|
||||||
try {
|
try {
|
||||||
const actor = await getOrCreateActorAndServerAndModel(uri, true, true)
|
const actor = await getOrCreateActorAndServerAndModel(uri, 'all', true, true)
|
||||||
videoChannel = actor.VideoChannel
|
videoChannel = actor.VideoChannel
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.info('Cannot search remote video channel %s.', uri, { err })
|
logger.info('Cannot search remote video channel %s.', uri, { err })
|
||||||
|
|
13
server/helpers/actor.ts
Normal file
13
server/helpers/actor.ts
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
import { ActorModel } from '../models/activitypub/actor'
|
||||||
|
|
||||||
|
type ActorFetchByUrlType = 'all' | 'actor-and-association-ids'
|
||||||
|
function fetchActorByUrl (url: string, fetchType: ActorFetchByUrlType) {
|
||||||
|
if (fetchType === 'all') return ActorModel.loadByUrlAndPopulateAccountAndChannel(url)
|
||||||
|
|
||||||
|
if (fetchType === 'actor-and-association-ids') return ActorModel.loadByUrl(url)
|
||||||
|
}
|
||||||
|
|
||||||
|
export {
|
||||||
|
ActorFetchByUrlType,
|
||||||
|
fetchActorByUrl
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import { ServerModel } from '../../models/server/server'
|
||||||
import { VideoChannelModel } from '../../models/video/video-channel'
|
import { VideoChannelModel } from '../../models/video/video-channel'
|
||||||
import { JobQueue } from '../job-queue'
|
import { JobQueue } from '../job-queue'
|
||||||
import { getServerActor } from '../../helpers/utils'
|
import { getServerActor } from '../../helpers/utils'
|
||||||
|
import { ActorFetchByUrlType, fetchActorByUrl } from '../../helpers/actor'
|
||||||
|
|
||||||
// Set account keys, this could be long so process after the account creation and do not block the client
|
// Set account keys, this could be long so process after the account creation and do not block the client
|
||||||
function setAsyncActorKeys (actor: ActorModel) {
|
function setAsyncActorKeys (actor: ActorModel) {
|
||||||
|
@ -38,13 +39,14 @@ function setAsyncActorKeys (actor: ActorModel) {
|
||||||
|
|
||||||
async function getOrCreateActorAndServerAndModel (
|
async function getOrCreateActorAndServerAndModel (
|
||||||
activityActor: string | ActivityPubActor,
|
activityActor: string | ActivityPubActor,
|
||||||
|
fetchType: ActorFetchByUrlType = 'actor-and-association-ids',
|
||||||
recurseIfNeeded = true,
|
recurseIfNeeded = true,
|
||||||
updateCollections = false
|
updateCollections = false
|
||||||
) {
|
) {
|
||||||
const actorUrl = getActorUrl(activityActor)
|
const actorUrl = getActorUrl(activityActor)
|
||||||
let created = false
|
let created = false
|
||||||
|
|
||||||
let actor = await ActorModel.loadByUrl(actorUrl)
|
let actor = await fetchActorByUrl(actorUrl, fetchType)
|
||||||
// Orphan actor (not associated to an account of channel) so recreate it
|
// Orphan actor (not associated to an account of channel) so recreate it
|
||||||
if (actor && (!actor.Account && !actor.VideoChannel)) {
|
if (actor && (!actor.Account && !actor.VideoChannel)) {
|
||||||
await actor.destroy()
|
await actor.destroy()
|
||||||
|
@ -65,7 +67,7 @@ async function getOrCreateActorAndServerAndModel (
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Assert we don't recurse another time
|
// Assert we don't recurse another time
|
||||||
ownerActor = await getOrCreateActorAndServerAndModel(accountAttributedTo.id, false)
|
ownerActor = await getOrCreateActorAndServerAndModel(accountAttributedTo.id, 'all', false)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error('Cannot get or create account attributed to video channel ' + actor.url)
|
logger.error('Cannot get or create account attributed to video channel ' + actor.url)
|
||||||
throw new Error(err)
|
throw new Error(err)
|
||||||
|
@ -76,10 +78,7 @@ async function getOrCreateActorAndServerAndModel (
|
||||||
created = true
|
created = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if (actor.Account) actor.Account.Actor = actor
|
const { actor: actorRefreshed, refreshed } = await retryTransactionWrapper(refreshActorIfNeeded, actor, fetchType)
|
||||||
if (actor.VideoChannel) actor.VideoChannel.Actor = actor
|
|
||||||
|
|
||||||
const { actor: actorRefreshed, refreshed } = await retryTransactionWrapper(refreshActorIfNeeded, actor)
|
|
||||||
if (!actorRefreshed) throw new Error('Actor ' + actorRefreshed.url + ' does not exist anymore.')
|
if (!actorRefreshed) throw new Error('Actor ' + actorRefreshed.url + ' does not exist anymore.')
|
||||||
|
|
||||||
if ((created === true || refreshed === true) && updateCollections === true) {
|
if ((created === true || refreshed === true) && updateCollections === true) {
|
||||||
|
@ -370,8 +369,14 @@ async function saveVideoChannel (actor: ActorModel, result: FetchRemoteActorResu
|
||||||
return videoChannelCreated
|
return videoChannelCreated
|
||||||
}
|
}
|
||||||
|
|
||||||
async function refreshActorIfNeeded (actor: ActorModel): Promise<{ actor: ActorModel, refreshed: boolean }> {
|
async function refreshActorIfNeeded (
|
||||||
if (!actor.isOutdated()) return { actor, refreshed: false }
|
actorArg: ActorModel,
|
||||||
|
fetchedType: ActorFetchByUrlType
|
||||||
|
): Promise<{ actor: ActorModel, refreshed: boolean }> {
|
||||||
|
if (!actorArg.isOutdated()) return { actor: actorArg, refreshed: false }
|
||||||
|
|
||||||
|
// We need more attributes
|
||||||
|
const actor = fetchedType === 'all' ? actorArg : await ActorModel.loadByUrlAndPopulateAccountAndChannel(actorArg.url)
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const actorUrl = await getUrlFromWebfinger(actor.preferredUsername + '@' + actor.getHost())
|
const actorUrl = await getUrlFromWebfinger(actor.preferredUsername + '@' + actor.getHost())
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
import { CacheFileObject } from '../../../shared/index'
|
import { CacheFileObject } from '../../../shared/index'
|
||||||
import { VideoModel } from '../../models/video/video'
|
import { VideoModel } from '../../models/video/video'
|
||||||
import { ActorModel } from '../../models/activitypub/actor'
|
|
||||||
import { sequelizeTypescript } from '../../initializers'
|
import { sequelizeTypescript } from '../../initializers'
|
||||||
import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
|
import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
|
||||||
|
|
||||||
function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: ActorModel) {
|
function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) {
|
||||||
const url = cacheFileObject.url
|
const url = cacheFileObject.url
|
||||||
|
|
||||||
const videoFile = video.VideoFiles.find(f => {
|
const videoFile = video.VideoFiles.find(f => {
|
||||||
|
@ -23,7 +22,7 @@ function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: ActorModel) {
|
function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) {
|
||||||
return sequelizeTypescript.transaction(async t => {
|
return sequelizeTypescript.transaction(async t => {
|
||||||
const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
|
const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
|
||||||
|
|
||||||
|
@ -31,7 +30,7 @@ function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, b
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: ActorModel) {
|
function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: { id?: number }) {
|
||||||
const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor)
|
const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor)
|
||||||
|
|
||||||
redundancyModel.set('expires', attributes.expiresOn)
|
redundancyModel.set('expires', attributes.expiresOn)
|
||||||
|
|
|
@ -1,15 +1,11 @@
|
||||||
import { ActivityAccept } from '../../../../shared/models/activitypub'
|
import { ActivityAccept } from '../../../../shared/models/activitypub'
|
||||||
import { getActorUrl } from '../../../helpers/activitypub'
|
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
||||||
import { addFetchOutboxJob } from '../actor'
|
import { addFetchOutboxJob } from '../actor'
|
||||||
|
|
||||||
async function processAcceptActivity (activity: ActivityAccept, inboxActor?: ActorModel) {
|
async function processAcceptActivity (activity: ActivityAccept, targetActor: ActorModel, inboxActor?: ActorModel) {
|
||||||
if (inboxActor === undefined) throw new Error('Need to accept on explicit inbox.')
|
if (inboxActor === undefined) throw new Error('Need to accept on explicit inbox.')
|
||||||
|
|
||||||
const actorUrl = getActorUrl(activity.actor)
|
|
||||||
const targetActor = await ActorModel.loadByUrl(actorUrl)
|
|
||||||
|
|
||||||
return processAccept(inboxActor, targetActor)
|
return processAccept(inboxActor, targetActor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,15 +2,11 @@ import { ActivityAnnounce } from '../../../../shared/models/activitypub'
|
||||||
import { retryTransactionWrapper } from '../../../helpers/database-utils'
|
import { retryTransactionWrapper } from '../../../helpers/database-utils'
|
||||||
import { sequelizeTypescript } from '../../../initializers'
|
import { sequelizeTypescript } from '../../../initializers'
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { VideoModel } from '../../../models/video/video'
|
|
||||||
import { VideoShareModel } from '../../../models/video/video-share'
|
import { VideoShareModel } from '../../../models/video/video-share'
|
||||||
import { getOrCreateActorAndServerAndModel } from '../actor'
|
|
||||||
import { forwardVideoRelatedActivity } from '../send/utils'
|
import { forwardVideoRelatedActivity } from '../send/utils'
|
||||||
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
||||||
|
|
||||||
async function processAnnounceActivity (activity: ActivityAnnounce) {
|
async function processAnnounceActivity (activity: ActivityAnnounce, actorAnnouncer: ActorModel) {
|
||||||
const actorAnnouncer = await getOrCreateActorAndServerAndModel(activity.actor)
|
|
||||||
|
|
||||||
return retryTransactionWrapper(processVideoShare, actorAnnouncer, activity)
|
return retryTransactionWrapper(processVideoShare, actorAnnouncer, activity)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,30 +7,28 @@ import { sequelizeTypescript } from '../../../initializers'
|
||||||
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
|
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { VideoAbuseModel } from '../../../models/video/video-abuse'
|
import { VideoAbuseModel } from '../../../models/video/video-abuse'
|
||||||
import { getOrCreateActorAndServerAndModel } from '../actor'
|
|
||||||
import { addVideoComment, resolveThread } from '../video-comments'
|
import { addVideoComment, resolveThread } from '../video-comments'
|
||||||
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
||||||
import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils'
|
import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils'
|
||||||
import { Redis } from '../../redis'
|
import { Redis } from '../../redis'
|
||||||
import { createCacheFile } from '../cache-file'
|
import { createCacheFile } from '../cache-file'
|
||||||
|
|
||||||
async function processCreateActivity (activity: ActivityCreate) {
|
async function processCreateActivity (activity: ActivityCreate, byActor: ActorModel) {
|
||||||
const activityObject = activity.object
|
const activityObject = activity.object
|
||||||
const activityType = activityObject.type
|
const activityType = activityObject.type
|
||||||
const actor = await getOrCreateActorAndServerAndModel(activity.actor)
|
|
||||||
|
|
||||||
if (activityType === 'View') {
|
if (activityType === 'View') {
|
||||||
return processCreateView(actor, activity)
|
return processCreateView(byActor, activity)
|
||||||
} else if (activityType === 'Dislike') {
|
} else if (activityType === 'Dislike') {
|
||||||
return retryTransactionWrapper(processCreateDislike, actor, activity)
|
return retryTransactionWrapper(processCreateDislike, byActor, activity)
|
||||||
} else if (activityType === 'Video') {
|
} else if (activityType === 'Video') {
|
||||||
return processCreateVideo(activity)
|
return processCreateVideo(activity)
|
||||||
} else if (activityType === 'Flag') {
|
} else if (activityType === 'Flag') {
|
||||||
return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject)
|
return retryTransactionWrapper(processCreateVideoAbuse, byActor, activityObject as VideoAbuseObject)
|
||||||
} else if (activityType === 'Note') {
|
} else if (activityType === 'Note') {
|
||||||
return retryTransactionWrapper(processCreateVideoComment, actor, activity)
|
return retryTransactionWrapper(processCreateVideoComment, byActor, activity)
|
||||||
} else if (activityType === 'CacheFile') {
|
} else if (activityType === 'CacheFile') {
|
||||||
return retryTransactionWrapper(processCacheFile, actor, activity)
|
return retryTransactionWrapper(processCacheFile, byActor, activity)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.warn('Unknown activity object type %s when creating activity.', activityType, { activity: activity.id })
|
logger.warn('Unknown activity object type %s when creating activity.', activityType, { activity: activity.id })
|
||||||
|
@ -118,11 +116,11 @@ async function processCacheFile (byActor: ActorModel, activity: ActivityCreate)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function processCreateVideoAbuse (actor: ActorModel, videoAbuseToCreateData: VideoAbuseObject) {
|
async function processCreateVideoAbuse (byActor: ActorModel, videoAbuseToCreateData: VideoAbuseObject) {
|
||||||
logger.debug('Reporting remote abuse for video %s.', videoAbuseToCreateData.object)
|
logger.debug('Reporting remote abuse for video %s.', videoAbuseToCreateData.object)
|
||||||
|
|
||||||
const account = actor.Account
|
const account = byActor.Account
|
||||||
if (!account) throw new Error('Cannot create dislike with the non account actor ' + actor.url)
|
if (!account) throw new Error('Cannot create dislike with the non account actor ' + byActor.url)
|
||||||
|
|
||||||
const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: videoAbuseToCreateData.object })
|
const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: videoAbuseToCreateData.object })
|
||||||
|
|
||||||
|
|
|
@ -7,34 +7,32 @@ import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { VideoModel } from '../../../models/video/video'
|
import { VideoModel } from '../../../models/video/video'
|
||||||
import { VideoChannelModel } from '../../../models/video/video-channel'
|
import { VideoChannelModel } from '../../../models/video/video-channel'
|
||||||
import { VideoCommentModel } from '../../../models/video/video-comment'
|
import { VideoCommentModel } from '../../../models/video/video-comment'
|
||||||
import { getOrCreateActorAndServerAndModel } from '../actor'
|
|
||||||
import { forwardActivity } from '../send/utils'
|
import { forwardActivity } from '../send/utils'
|
||||||
|
|
||||||
async function processDeleteActivity (activity: ActivityDelete) {
|
async function processDeleteActivity (activity: ActivityDelete, byActor: ActorModel) {
|
||||||
const objectUrl = typeof activity.object === 'string' ? activity.object : activity.object.id
|
const objectUrl = typeof activity.object === 'string' ? activity.object : activity.object.id
|
||||||
|
|
||||||
if (activity.actor === objectUrl) {
|
if (activity.actor === objectUrl) {
|
||||||
let actor = await ActorModel.loadByUrl(activity.actor)
|
// We need more attributes (all the account and channel)
|
||||||
if (!actor) return undefined
|
const byActorFull = await ActorModel.loadByUrlAndPopulateAccountAndChannel(byActor.url)
|
||||||
|
|
||||||
if (actor.type === 'Person') {
|
if (byActorFull.type === 'Person') {
|
||||||
if (!actor.Account) throw new Error('Actor ' + actor.url + ' is a person but we cannot find it in database.')
|
if (!byActorFull.Account) throw new Error('Actor ' + byActorFull.url + ' is a person but we cannot find it in database.')
|
||||||
|
|
||||||
actor.Account.Actor = await actor.Account.$get('Actor') as ActorModel
|
byActorFull.Account.Actor = await byActorFull.Account.$get('Actor') as ActorModel
|
||||||
return retryTransactionWrapper(processDeleteAccount, actor.Account)
|
return retryTransactionWrapper(processDeleteAccount, byActorFull.Account)
|
||||||
} else if (actor.type === 'Group') {
|
} else if (byActorFull.type === 'Group') {
|
||||||
if (!actor.VideoChannel) throw new Error('Actor ' + actor.url + ' is a group but we cannot find it in database.')
|
if (!byActorFull.VideoChannel) throw new Error('Actor ' + byActorFull.url + ' is a group but we cannot find it in database.')
|
||||||
|
|
||||||
actor.VideoChannel.Actor = await actor.VideoChannel.$get('Actor') as ActorModel
|
byActorFull.VideoChannel.Actor = await byActorFull.VideoChannel.$get('Actor') as ActorModel
|
||||||
return retryTransactionWrapper(processDeleteVideoChannel, actor.VideoChannel)
|
return retryTransactionWrapper(processDeleteVideoChannel, byActorFull.VideoChannel)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const actor = await getOrCreateActorAndServerAndModel(activity.actor)
|
|
||||||
{
|
{
|
||||||
const videoCommentInstance = await VideoCommentModel.loadByUrlAndPopulateAccount(objectUrl)
|
const videoCommentInstance = await VideoCommentModel.loadByUrlAndPopulateAccount(objectUrl)
|
||||||
if (videoCommentInstance) {
|
if (videoCommentInstance) {
|
||||||
return retryTransactionWrapper(processDeleteVideoComment, actor, videoCommentInstance, activity)
|
return retryTransactionWrapper(processDeleteVideoComment, byActor, videoCommentInstance, activity)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,7 +41,7 @@ async function processDeleteActivity (activity: ActivityDelete) {
|
||||||
if (videoInstance) {
|
if (videoInstance) {
|
||||||
if (videoInstance.isOwned()) throw new Error(`Remote instance cannot delete owned video ${videoInstance.url}.`)
|
if (videoInstance.isOwned()) throw new Error(`Remote instance cannot delete owned video ${videoInstance.url}.`)
|
||||||
|
|
||||||
return retryTransactionWrapper(processDeleteVideo, actor, videoInstance)
|
return retryTransactionWrapper(processDeleteVideo, byActor, videoInstance)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,14 +4,12 @@ import { logger } from '../../../helpers/logger'
|
||||||
import { sequelizeTypescript } from '../../../initializers'
|
import { sequelizeTypescript } from '../../../initializers'
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
||||||
import { getOrCreateActorAndServerAndModel } from '../actor'
|
|
||||||
import { sendAccept } from '../send'
|
import { sendAccept } from '../send'
|
||||||
|
|
||||||
async function processFollowActivity (activity: ActivityFollow) {
|
async function processFollowActivity (activity: ActivityFollow, byActor: ActorModel) {
|
||||||
const activityObject = activity.object
|
const activityObject = activity.object
|
||||||
const actor = await getOrCreateActorAndServerAndModel(activity.actor)
|
|
||||||
|
|
||||||
return retryTransactionWrapper(processFollow, actor, activityObject)
|
return retryTransactionWrapper(processFollow, byActor, activityObject)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
@ -24,7 +22,7 @@ export {
|
||||||
|
|
||||||
async function processFollow (actor: ActorModel, targetActorURL: string) {
|
async function processFollow (actor: ActorModel, targetActorURL: string) {
|
||||||
await sequelizeTypescript.transaction(async t => {
|
await sequelizeTypescript.transaction(async t => {
|
||||||
const targetActor = await ActorModel.loadByUrl(targetActorURL, t)
|
const targetActor = await ActorModel.loadByUrlAndPopulateAccountAndChannel(targetActorURL, t)
|
||||||
|
|
||||||
if (!targetActor) throw new Error('Unknown actor')
|
if (!targetActor) throw new Error('Unknown actor')
|
||||||
if (targetActor.isOwned() === false) throw new Error('This is not a local actor.')
|
if (targetActor.isOwned() === false) throw new Error('This is not a local actor.')
|
||||||
|
|
|
@ -3,14 +3,11 @@ import { retryTransactionWrapper } from '../../../helpers/database-utils'
|
||||||
import { sequelizeTypescript } from '../../../initializers'
|
import { sequelizeTypescript } from '../../../initializers'
|
||||||
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
|
import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { getOrCreateActorAndServerAndModel } from '../actor'
|
|
||||||
import { forwardVideoRelatedActivity } from '../send/utils'
|
import { forwardVideoRelatedActivity } from '../send/utils'
|
||||||
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
||||||
|
|
||||||
async function processLikeActivity (activity: ActivityLike) {
|
async function processLikeActivity (activity: ActivityLike, byActor: ActorModel) {
|
||||||
const actor = await getOrCreateActorAndServerAndModel(activity.actor)
|
return retryTransactionWrapper(processLikeVideo, byActor, activity)
|
||||||
|
|
||||||
return retryTransactionWrapper(processLikeVideo, actor, activity)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
@ -1,15 +1,11 @@
|
||||||
import { ActivityReject } from '../../../../shared/models/activitypub/activity'
|
import { ActivityReject } from '../../../../shared/models/activitypub/activity'
|
||||||
import { getActorUrl } from '../../../helpers/activitypub'
|
|
||||||
import { sequelizeTypescript } from '../../../initializers'
|
import { sequelizeTypescript } from '../../../initializers'
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
|
||||||
|
|
||||||
async function processRejectActivity (activity: ActivityReject, inboxActor?: ActorModel) {
|
async function processRejectActivity (activity: ActivityReject, targetActor: ActorModel, inboxActor?: ActorModel) {
|
||||||
if (inboxActor === undefined) throw new Error('Need to reject on explicit inbox.')
|
if (inboxActor === undefined) throw new Error('Need to reject on explicit inbox.')
|
||||||
|
|
||||||
const actorUrl = getActorUrl(activity.actor)
|
|
||||||
const targetActor = await ActorModel.loadByUrl(actorUrl)
|
|
||||||
|
|
||||||
return processReject(inboxActor, targetActor)
|
return processReject(inboxActor, targetActor)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@ import { getOrCreateVideoAndAccountAndChannel } from '../videos'
|
||||||
import { VideoShareModel } from '../../../models/video/video-share'
|
import { VideoShareModel } from '../../../models/video/video-share'
|
||||||
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
|
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
|
||||||
|
|
||||||
async function processUndoActivity (activity: ActivityUndo) {
|
async function processUndoActivity (activity: ActivityUndo, byActor: ActorModel) {
|
||||||
const activityToUndo = activity.object
|
const activityToUndo = activity.object
|
||||||
|
|
||||||
const actorUrl = getActorUrl(activity.actor)
|
const actorUrl = getActorUrl(activity.actor)
|
||||||
|
@ -26,16 +26,16 @@ async function processUndoActivity (activity: ActivityUndo) {
|
||||||
if (activityToUndo.object.type === 'Dislike') {
|
if (activityToUndo.object.type === 'Dislike') {
|
||||||
return retryTransactionWrapper(processUndoDislike, actorUrl, activity)
|
return retryTransactionWrapper(processUndoDislike, actorUrl, activity)
|
||||||
} else if (activityToUndo.object.type === 'CacheFile') {
|
} else if (activityToUndo.object.type === 'CacheFile') {
|
||||||
return retryTransactionWrapper(processUndoCacheFile, actorUrl, activity)
|
return retryTransactionWrapper(processUndoCacheFile, byActor, activity)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (activityToUndo.type === 'Follow') {
|
if (activityToUndo.type === 'Follow') {
|
||||||
return retryTransactionWrapper(processUndoFollow, actorUrl, activityToUndo)
|
return retryTransactionWrapper(processUndoFollow, byActor, activityToUndo)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (activityToUndo.type === 'Announce') {
|
if (activityToUndo.type === 'Announce') {
|
||||||
return retryTransactionWrapper(processUndoAnnounce, actorUrl, activityToUndo)
|
return retryTransactionWrapper(processUndoAnnounce, byActor, activityToUndo)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.warn('Unknown activity object type %s -> %s when undo activity.', activityToUndo.type, { activity: activity.id })
|
logger.warn('Unknown activity object type %s -> %s when undo activity.', activityToUndo.type, { activity: activity.id })
|
||||||
|
@ -99,15 +99,12 @@ async function processUndoDislike (actorUrl: string, activity: ActivityUndo) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async function processUndoCacheFile (actorUrl: string, activity: ActivityUndo) {
|
async function processUndoCacheFile (byActor: ActorModel, activity: ActivityUndo) {
|
||||||
const cacheFileObject = activity.object.object as CacheFileObject
|
const cacheFileObject = activity.object.object as CacheFileObject
|
||||||
|
|
||||||
const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object })
|
const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object })
|
||||||
|
|
||||||
return sequelizeTypescript.transaction(async t => {
|
return sequelizeTypescript.transaction(async t => {
|
||||||
const byActor = await ActorModel.loadByUrl(actorUrl)
|
|
||||||
if (!byActor) throw new Error('Unknown actor ' + actorUrl)
|
|
||||||
|
|
||||||
const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id)
|
const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id)
|
||||||
if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url)
|
if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url)
|
||||||
|
|
||||||
|
@ -122,10 +119,9 @@ async function processUndoCacheFile (actorUrl: string, activity: ActivityUndo) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function processUndoFollow (actorUrl: string, followActivity: ActivityFollow) {
|
function processUndoFollow (follower: ActorModel, followActivity: ActivityFollow) {
|
||||||
return sequelizeTypescript.transaction(async t => {
|
return sequelizeTypescript.transaction(async t => {
|
||||||
const follower = await ActorModel.loadByUrl(actorUrl, t)
|
const following = await ActorModel.loadByUrlAndPopulateAccountAndChannel(followActivity.object, t)
|
||||||
const following = await ActorModel.loadByUrl(followActivity.object, t)
|
|
||||||
const actorFollow = await ActorFollowModel.loadByActorAndTarget(follower.id, following.id, t)
|
const actorFollow = await ActorFollowModel.loadByActorAndTarget(follower.id, following.id, t)
|
||||||
|
|
||||||
if (!actorFollow) throw new Error(`'Unknown actor follow ${follower.id} -> ${following.id}.`)
|
if (!actorFollow) throw new Error(`'Unknown actor follow ${follower.id} -> ${following.id}.`)
|
||||||
|
@ -136,11 +132,8 @@ function processUndoFollow (actorUrl: string, followActivity: ActivityFollow) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
function processUndoAnnounce (actorUrl: string, announceActivity: ActivityAnnounce) {
|
function processUndoAnnounce (byActor: ActorModel, announceActivity: ActivityAnnounce) {
|
||||||
return sequelizeTypescript.transaction(async t => {
|
return sequelizeTypescript.transaction(async t => {
|
||||||
const byActor = await ActorModel.loadByUrl(actorUrl, t)
|
|
||||||
if (!byActor) throw new Error('Unknown actor ' + actorUrl)
|
|
||||||
|
|
||||||
const share = await VideoShareModel.loadByUrl(announceActivity.id, t)
|
const share = await VideoShareModel.loadByUrl(announceActivity.id, t)
|
||||||
if (!share) throw new Error(`Unknown video share ${announceActivity.id}.`)
|
if (!share) throw new Error(`Unknown video share ${announceActivity.id}.`)
|
||||||
|
|
||||||
|
|
|
@ -6,27 +6,30 @@ import { sequelizeTypescript } from '../../../initializers'
|
||||||
import { AccountModel } from '../../../models/account/account'
|
import { AccountModel } from '../../../models/account/account'
|
||||||
import { ActorModel } from '../../../models/activitypub/actor'
|
import { ActorModel } from '../../../models/activitypub/actor'
|
||||||
import { VideoChannelModel } from '../../../models/video/video-channel'
|
import { VideoChannelModel } from '../../../models/video/video-channel'
|
||||||
import { fetchAvatarIfExists, getOrCreateActorAndServerAndModel, updateActorAvatarInstance, updateActorInstance } from '../actor'
|
import { fetchAvatarIfExists, updateActorAvatarInstance, updateActorInstance } from '../actor'
|
||||||
import { getOrCreateVideoAndAccountAndChannel, updateVideoFromAP, getOrCreateVideoChannelFromVideoObject } from '../videos'
|
import { getOrCreateVideoAndAccountAndChannel, getOrCreateVideoChannelFromVideoObject, updateVideoFromAP } from '../videos'
|
||||||
import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-validators/activitypub/videos'
|
import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-validators/activitypub/videos'
|
||||||
import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file'
|
import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file'
|
||||||
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
|
import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
|
||||||
import { createCacheFile, updateCacheFile } from '../cache-file'
|
import { createCacheFile, updateCacheFile } from '../cache-file'
|
||||||
|
|
||||||
async function processUpdateActivity (activity: ActivityUpdate) {
|
async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) {
|
||||||
const actor = await getOrCreateActorAndServerAndModel(activity.actor)
|
|
||||||
const objectType = activity.object.type
|
const objectType = activity.object.type
|
||||||
|
|
||||||
if (objectType === 'Video') {
|
if (objectType === 'Video') {
|
||||||
return retryTransactionWrapper(processUpdateVideo, actor, activity)
|
return retryTransactionWrapper(processUpdateVideo, byActor, activity)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (objectType === 'Person' || objectType === 'Application' || objectType === 'Group') {
|
if (objectType === 'Person' || objectType === 'Application' || objectType === 'Group') {
|
||||||
return retryTransactionWrapper(processUpdateActor, actor, activity)
|
// We need more attributes
|
||||||
|
const byActorFull = await ActorModel.loadByUrlAndPopulateAccountAndChannel(byActor.url)
|
||||||
|
return retryTransactionWrapper(processUpdateActor, byActorFull, activity)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (objectType === 'CacheFile') {
|
if (objectType === 'CacheFile') {
|
||||||
return retryTransactionWrapper(processUpdateCacheFile, actor, activity)
|
// We need more attributes
|
||||||
|
const byActorFull = await ActorModel.loadByUrlAndPopulateAccountAndChannel(byActor.url)
|
||||||
|
return retryTransactionWrapper(processUpdateCacheFile, byActorFull, activity)
|
||||||
}
|
}
|
||||||
|
|
||||||
return undefined
|
return undefined
|
||||||
|
|
|
@ -11,8 +11,9 @@ import { processLikeActivity } from './process-like'
|
||||||
import { processRejectActivity } from './process-reject'
|
import { processRejectActivity } from './process-reject'
|
||||||
import { processUndoActivity } from './process-undo'
|
import { processUndoActivity } from './process-undo'
|
||||||
import { processUpdateActivity } from './process-update'
|
import { processUpdateActivity } from './process-update'
|
||||||
|
import { getOrCreateActorAndServerAndModel } from '../actor'
|
||||||
|
|
||||||
const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxActor?: ActorModel) => Promise<any> } = {
|
const processActivity: { [ P in ActivityType ]: (activity: Activity, byActor: ActorModel, inboxActor?: ActorModel) => Promise<any> } = {
|
||||||
Create: processCreateActivity,
|
Create: processCreateActivity,
|
||||||
Update: processUpdateActivity,
|
Update: processUpdateActivity,
|
||||||
Delete: processDeleteActivity,
|
Delete: processDeleteActivity,
|
||||||
|
@ -25,6 +26,8 @@ const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxActor?
|
||||||
}
|
}
|
||||||
|
|
||||||
async function processActivities (activities: Activity[], signatureActor?: ActorModel, inboxActor?: ActorModel) {
|
async function processActivities (activities: Activity[], signatureActor?: ActorModel, inboxActor?: ActorModel) {
|
||||||
|
const actorsCache: { [ url: string ]: ActorModel } = {}
|
||||||
|
|
||||||
for (const activity of activities) {
|
for (const activity of activities) {
|
||||||
const actorUrl = getActorUrl(activity.actor)
|
const actorUrl = getActorUrl(activity.actor)
|
||||||
|
|
||||||
|
@ -34,6 +37,9 @@ async function processActivities (activities: Activity[], signatureActor?: Actor
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const byActor = signatureActor || actorsCache[actorUrl] || await getOrCreateActorAndServerAndModel(actorUrl)
|
||||||
|
actorsCache[actorUrl] = byActor
|
||||||
|
|
||||||
const activityProcessor = processActivity[activity.type]
|
const activityProcessor = processActivity[activity.type]
|
||||||
if (activityProcessor === undefined) {
|
if (activityProcessor === undefined) {
|
||||||
logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id })
|
logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id })
|
||||||
|
@ -41,7 +47,7 @@ async function processActivities (activities: Activity[], signatureActor?: Actor
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await activityProcessor(activity, inboxActor)
|
await activityProcessor(activity, byActor, inboxActor)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.warn('Cannot process activity %s.', activity.type, { err })
|
logger.warn('Cannot process activity %s.', activity.type, { err })
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ function getOrCreateVideoChannelFromVideoObject (videoObject: VideoTorrentObject
|
||||||
const channel = videoObject.attributedTo.find(a => a.type === 'Group')
|
const channel = videoObject.attributedTo.find(a => a.type === 'Group')
|
||||||
if (!channel) throw new Error('Cannot find associated video channel to video ' + videoObject.url)
|
if (!channel) throw new Error('Cannot find associated video channel to video ' + videoObject.url)
|
||||||
|
|
||||||
return getOrCreateActorAndServerAndModel(channel.id)
|
return getOrCreateActorAndServerAndModel(channel.id, 'all')
|
||||||
}
|
}
|
||||||
|
|
||||||
type SyncParam = {
|
type SyncParam = {
|
||||||
|
|
|
@ -323,6 +323,29 @@ export class ActorModel extends Model<ActorModel> {
|
||||||
}
|
}
|
||||||
|
|
||||||
static loadByUrl (url: string, transaction?: Sequelize.Transaction) {
|
static loadByUrl (url: string, transaction?: Sequelize.Transaction) {
|
||||||
|
const query = {
|
||||||
|
where: {
|
||||||
|
url
|
||||||
|
},
|
||||||
|
transaction,
|
||||||
|
include: [
|
||||||
|
{
|
||||||
|
attributes: [ 'id' ],
|
||||||
|
model: AccountModel.unscoped(),
|
||||||
|
required: false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
attributes: [ 'id' ],
|
||||||
|
model: VideoChannelModel.unscoped(),
|
||||||
|
required: false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
return ActorModel.unscoped().findOne(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
static loadByUrlAndPopulateAccountAndChannel (url: string, transaction?: Sequelize.Transaction) {
|
||||||
const query = {
|
const query = {
|
||||||
where: {
|
where: {
|
||||||
url
|
url
|
||||||
|
|
Loading…
Reference in New Issue
Block a user