Skip to content

Commit

Permalink
Add custom upload ability for runners
Browse files Browse the repository at this point in the history
  • Loading branch information
Chocobozzz committed Dec 13, 2024
1 parent 281e7ae commit 9f33ae7
Show file tree
Hide file tree
Showing 25 changed files with 407 additions and 73 deletions.
8 changes: 6 additions & 2 deletions apps/peertube-runner/src/server/process/shared/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ export function scheduleTranscodingProgress (options: {
: 60000

const update = () => {
server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
.catch(err => logger.error({ err }, 'Cannot send job progress'))
server.runnerJobs.update({
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
progress: progressGetter()
}).catch(err => logger.error({ err }, 'Cannot send job progress'))
}

const interval = setInterval(() => {
Expand Down
41 changes: 22 additions & 19 deletions apps/peertube-runner/src/server/process/shared/process-live.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ export class ProcessLiveRTMPHLSTranscoding {
logger.debug(`Using ${this.outputPath} to process live rtmp hls transcoding job ${options.job.uuid}`)
}

process () {
const job = this.options.job
const payload = job.payload
private get payload () {
return this.options.job.payload
}

process () {
return new Promise<void>(async (res, rej) => {
try {
await ensureDir(this.outputPath)

logger.info(`Probing ${payload.input.rtmpUrl}`)
const probe = await ffprobePromise(payload.input.rtmpUrl)
logger.info({ probe }, `Probed ${payload.input.rtmpUrl}`)
logger.info(`Probing ${this.payload.input.rtmpUrl}`)
const probe = await ffprobePromise(this.payload.input.rtmpUrl)
logger.info({ probe }, `Probed ${this.payload.input.rtmpUrl}`)

const hasAudio = await hasAudioStream(payload.input.rtmpUrl, probe)
const hasVideo = await hasVideoStream(payload.input.rtmpUrl, probe)
const bitrate = await getVideoStreamBitrate(payload.input.rtmpUrl, probe)
const { ratio } = await getVideoStreamDimensionsInfo(payload.input.rtmpUrl, probe)
const hasAudio = await hasAudioStream(this.payload.input.rtmpUrl, probe)
const hasVideo = await hasVideoStream(this.payload.input.rtmpUrl, probe)
const bitrate = await getVideoStreamBitrate(this.payload.input.rtmpUrl, probe)
const { ratio } = await getVideoStreamDimensionsInfo(this.payload.input.rtmpUrl, probe)

const m3u8Watcher = watch(this.outputPath + '/*.m3u8')
this.fsWatchers.push(m3u8Watcher)
Expand Down Expand Up @@ -107,15 +108,15 @@ export class ProcessLiveRTMPHLSTranscoding {
})

this.ffmpegCommand = await buildFFmpegLive().getLiveTranscodingCommand({
inputUrl: payload.input.rtmpUrl,
inputUrl: this.payload.input.rtmpUrl,

outPath: this.outputPath,
masterPlaylistName: 'master.m3u8',

segmentListSize: payload.output.segmentListSize,
segmentDuration: payload.output.segmentDuration,
segmentListSize: this.payload.output.segmentListSize,
segmentDuration: this.payload.output.segmentDuration,

toTranscode: payload.output.toTranscode,
toTranscode: this.payload.output.toTranscode,
splitAudioAndVideo: true,

bitrate,
Expand All @@ -126,7 +127,7 @@ export class ProcessLiveRTMPHLSTranscoding {
probe
})

logger.info(`Running live transcoding for ${payload.input.rtmpUrl}`)
logger.info(`Running live transcoding for ${this.payload.input.rtmpUrl}`)

this.ffmpegCommand.on('error', (err, stdout, stderr) => {
this.onFFmpegError({ err, stdout, stderr })
Expand Down Expand Up @@ -241,7 +242,8 @@ export class ProcessLiveRTMPHLSTranscoding {
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
payload: successBody
payload: successBody,
reqPayload: this.payload
})
}

Expand Down Expand Up @@ -324,15 +326,16 @@ export class ProcessLiveRTMPHLSTranscoding {
await Promise.all(parallelPromises)
}

private async updateWithRetry (payload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
private async updateWithRetry (updatePayload: CustomLiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
if (this.ended || this.errored) return

try {
await this.options.server.runnerJobs.update({
jobToken: this.options.job.jobToken,
jobUUID: this.options.job.uuid,
runnerToken: this.options.runnerToken,
payload: payload as any
payload: updatePayload as any,
reqPayload: this.payload
})
} catch (err) {
if (currentTry >= 3) throw err
Expand All @@ -341,7 +344,7 @@ export class ProcessLiveRTMPHLSTranscoding {
logger.warn({ err }, 'Will retry update after error')
await wait(250)

return this.updateWithRetry(payload, currentTry + 1)
return this.updateWithRetry(updatePayload, currentTry + 1)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
payload: successBody,
reqPayload: payload
})
} finally {
if (tmpVideoInputFilePath) await remove(tmpVideoInputFilePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ export async function processVideoTranscription (options: ProcessOptions<RunnerJ
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
payload: successBody,
reqPayload: payload
})
} finally {
if (inputPath) await remove(inputPath)
Expand Down
9 changes: 6 additions & 3 deletions apps/peertube-runner/src/server/process/shared/process-vod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ export async function processWebVideoTranscoding (options: ProcessOptions<Runner
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
payload: successBody,
reqPayload: payload
})
} finally {
if (videoInputPath) await remove(videoInputPath)
Expand Down Expand Up @@ -139,7 +140,8 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
payload: successBody,
reqPayload: payload
})
} finally {
if (videoInputPath) await remove(videoInputPath)
Expand Down Expand Up @@ -207,7 +209,8 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn
jobToken: job.jobToken,
jobUUID: job.uuid,
runnerToken,
payload: successBody
payload: successBody,
reqPayload: payload
})
} finally {
if (audioPath) await remove(audioPath)
Expand Down
4 changes: 2 additions & 2 deletions packages/models/src/runners/accept-runner-job-result.model.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RunnerJobPayload } from './runner-job-payload.model.js'
import { RunnerJob } from './runner-job.model.js'
import { RunnerJobPayload } from './runner-jobs/runner-job-payload.model.js'
import { RunnerJob } from './runner-jobs/runner-job.model.js'

export interface AcceptRunnerJobResult <T extends RunnerJobPayload = RunnerJobPayload> {
job: RunnerJob<T> & { jobToken: string }
Expand Down
14 changes: 7 additions & 7 deletions packages/models/src/runners/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ export * from './register-runner-body.model.js'
export * from './register-runner-result.model.js'
export * from './request-runner-job-body.model.js'
export * from './request-runner-job-result.model.js'
export * from './runner-job-payload.model.js'
export * from './runner-job-private-payload.model.js'
export * from './runner-job-state.model.js'
export * from './runner-job-success-body.model.js'
export * from './runner-job-type.type.js'
export * from './runner-job-update-body.model.js'
export * from './runner-job.model.js'
export * from './runner-jobs/runner-job-payload.model.js'
export * from './runner-jobs/runner-job-private-payload.model.js'
export * from './runner-jobs/runner-job-state.model.js'
export * from './runner-jobs/runner-job-success-body.model.js'
export * from './runner-jobs/runner-job-type.type.js'
export * from './runner-jobs/runner-job-update-body.model.js'
export * from './runner-jobs/runner-job.model.js'
export * from './runner-registration-token.js'
export * from './runner.model.js'
export * from './unregister-runner-body.model.js'
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RunnerJobStateType } from './runner-job-state.model.js'
import { RunnerJobStateType } from './runner-jobs/runner-job-state.model.js'

export interface ListRunnerJobsQuery {
start?: number
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RunnerJobType } from './runner-job-type.type.js'
import { RunnerJobType } from './runner-jobs/runner-job-type.type.js'

export interface RequestRunnerJobBody {
runnerToken: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RunnerJobPayload } from './runner-job-payload.model.js'
import { RunnerJobType } from './runner-job-type.type.js'
import { RunnerJobPayload } from './runner-jobs/runner-job-payload.model.js'
import { RunnerJobType } from './runner-jobs/runner-job-type.type.js'

export interface RequestRunnerJobResult <P extends RunnerJobPayload = RunnerJobPayload> {
availableJobs: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { VideoStudioTaskPayload } from '../server/index.js'
import { VideoStudioTaskPayload } from '../../server/index.js'

export type RunnerJobCustomUpload = {
url: string
method?: 'PUT' | 'POST' // default 'PUT'
}

export type RunnerJobVODPayload =
RunnerJobVODWebVideoTranscodingPayload |
Expand All @@ -22,6 +27,9 @@ export interface RunnerJobVODWebVideoTranscodingPayload {
output: {
resolution: number
fps: number

// To upload on an external URL
videoFileCustomUpload?: RunnerJobCustomUpload
}
}

Expand All @@ -35,6 +43,10 @@ export interface RunnerJobVODHLSTranscodingPayload {
resolution: number
fps: number
separatedAudio: boolean

// To upload on an external URL
videoFileCustomUpload?: RunnerJobCustomUpload
resolutionPlaylistFileCustomUpload?: RunnerJobCustomUpload
}
}

Expand All @@ -47,6 +59,9 @@ export interface RunnerJobVODAudioMergeTranscodingPayload {
output: {
resolution: number
fps: number

// To upload on an external URL
videoFileCustomUpload?: RunnerJobCustomUpload
}
}

Expand All @@ -57,12 +72,22 @@ export interface RunnerJobStudioTranscodingPayload {
}

tasks: VideoStudioTaskPayload[]

output: {
// To upload on an external URL
videoFileCustomUpload?: RunnerJobCustomUpload
}
}

export interface RunnerJobTranscriptionPayload {
input: {
videoFileUrl: string
}

output: {
// To upload on an external URL
vttFileCustomUpload?: RunnerJobCustomUpload
}
}

// ---------------------------------------------------------------------------
Expand All @@ -86,5 +111,10 @@ export interface RunnerJobLiveRTMPHLSTranscodingPayload {

segmentDuration: number
segmentListSize: number

// To upload on an external URL
masterPlaylistFileCustomUpload?: RunnerJobCustomUpload
resolutionPlaylistFileCustomUpload?: RunnerJobCustomUpload
videoChunkFileCustomUpload?: RunnerJobCustomUpload
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { VideoStudioTaskPayload } from '../server/index.js'
import { VideoStudioTaskPayload } from '../../server/index.js'

export type RunnerJobVODPrivatePayload =
RunnerJobVODWebVideoTranscodingPrivatePayload |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { VideoConstant } from '../videos/index.js'
import { VideoConstant } from '../../videos/index.js'
import { RunnerJobPayload } from './runner-job-payload.model.js'
import { RunnerJobPrivatePayload } from './runner-job-private-payload.model.js'
import { RunnerJobStateType } from './runner-job-state.model.js'
Expand Down
Loading

0 comments on commit 9f33ae7

Please sign in to comment.