Skip to content

Jobs

jobs

Defines classes and functions to manage the server's asynchronous job queue, which handles the execution of long-running operations

Long-Running Operations
  • Transcription
  • SRT Generation
  • Media Conversion
  • LLM SRT-Fixing
How It Works
  • Jobs are queued in-process and run by a single worker coroutine that is initialised on app startup and persists throughout its lifecycle

  • Each job references an existing profile file and persists its output back as profile files or transcripts, which is what lets the client operate on server-side media without re-uploading it

HANDLERS = {'generate_srt': generate_srt_handler, 'transcribe': transcribe_handler, 'convert': convert_handler, 'fix_srt': fix_srt_handler, 'batch_generate_srt': batch_generate_srt_handler, 'batch_transcribe': batch_transcribe_handler, 'batch_convert': batch_convert_handler, 'batch_fix_srt': batch_fix_srt_handler} module-attribute

Dictionary mapping all currently available JobHandler functions to the type attribute of the database-persisted job that they execute

JobQueueManager

Usage

Allows the client to submit long-running operation requests and tracks their statuses across navigation through the jobs database table

Durability
  • The queue is in-process and does not survive a restart

  • On shutdown, stop attempts to fail any job still left running

  • On startup, start fails any job still left running because of a server crash and re-queues any queued job
Concurrency
  • One worker runs jobs sequentially, so 2 or more heavy, local GPU/CPU operations never run simultaneously

  • Cross-file parallelism for batches is the job handler's concern, not the worker's

Job Handlers
  • A Job Handler is a function that runs an operation for a job with a specific type attribute and returns its results (e.g File references, strings, ...) in an awaitable coroutine

  • Handlers are keyed by the job type that they execute in an instace-scoped dictionary

  • Handlers can be registered to an instance by using the register_handler method, which accepts both the handler function and a string representing the specific job type which the handler executes

  • The _run_job function queries the database for the provided job_id , looks up itstypeattribute and routes execution to the designated registered handling, failing jobs whosetype` doesn't match any registered handler

Attributes:

Name Type Description
queue Queue[UUID] | None

The in-process job queue, created in start once the loop is running

_worker_task Task[None] | None

The running worker coroutine , or None when the worker is stopped

processor Processor

The running app's lifecycle-scoped Processor instance which orchestrates actual job execution and is passed to Job Handler functions

handlers dict[str, _JobHandler]

A dictionary mapping registered _JobHandler functions to the job type that they execute

running_job_id property

The id of the job the worker is currently executing, or None

Returns:

Type Description
UUID | None

The in-flight job id, or None when the worker is idle

register_handler(job_type, handler)

Register a new Job Handler to the instance

Parameters:

Name Type Description Default
job_type str

The type attribute of the jobs that this handler is supposed to execute

required
handler _JobHandler

The JobHandler function that executes this type of job

required

start() async

Starts the worker and rebuilds the asynchronous queue on server restarts from the jobs presisted in the database on previous runs, requeuing any queued jobs

stop() async

Stops the worker coroutine, cancelling any in-flight wait and marking running jobs as failed

submit_job(job_id) async

Adds an already persisted job to the queue for the worker to run

Parameters:

Name Type Description Default
job_id UUID

The id of the job to enqueue

required

Raises:

Type Description
RuntimeError

If the instance's queue hasn't been initialised with start before the call

batch_convert_handler(job, processor) async

Runs a batch of convert operations, one child job per file

The execution strategy is backend-specific. Modal fans the files across GPU containers, while the local backend runs them one at a time

Parameters:

Name Type Description Default
job JobDTO

The batch parent job (its children hold the file ids)

required
processor Processor

The conversion orchestrator

required

Returns:

Type Description
BatchResult

The batch outcome counts and child ids

batch_fix_srt_handler(job, processor) async

Runs a batch of fix_srt operations, one child job per file

LLM SRT-fixing is provider-agnostic and I/O-bound, so the files run through the bounded-concurrency runner on every backend, capped by MAX_LLM_CONCURRENCY

Parameters:

Name Type Description Default
job JobDTO

The batch parent job (its children hold the file ids)

required
processor Processor

Unused (the LLM layer is provider-agnostic)

required

Returns:

Type Description
BatchResult

The batch outcome counts and child ids

batch_generate_srt_handler(job, processor) async

Runs a batch of generate_srt operations, one child job per file

The execution strategy is backend-specific. Modal fans the files across containers, while the local backend runs them one at a time

Parameters:

Name Type Description Default
job JobDTO

The batch parent job (its children hold the file ids)

required
processor Processor

The transcription orchestrator

required

Returns:

Type Description
BatchResult

The batch outcome counts and child ids

batch_transcribe_handler(job, processor) async

Runs a batch of transcribe operations, one child job per file

The execution strategy is backend-specific. Modal fans the files across containers, while the local backend runs them one at a time

Parameters:

Name Type Description Default
job JobDTO

The batch parent job (its children hold the file ids)

required
processor Processor

The transcription orchestrator

required

Returns:

Type Description
BatchResult

The batch outcome counts and child ids

convert_handler(job, processor) async

Converts a profile video file to MP4 and stores the result

job

Expects a file_id and an optional ConvertVideoRequest-shaped opts in the job's params

Parameters:

Name Type Description Default
job JobDTO

The single-op job to run

required
processor Processor

The conversion orchestrator

required

Returns:

Type Description
ConvertResult

The new MP4 file's id and media URL

fix_srt_handler(job, processor) async

Refines a profile SRT file with an LLM and stores the cleaned result

job

Expects a file_id, a model selector, and an optional sys_msg in the job's params

Parameters:

Name Type Description Default
job JobDTO

The single-op job to run

required
processor Processor

Unused (the LLM layer is provider-agnostic)

required

Returns:

Type Description
SrtResult

The new SRT file's id, media URL, and content

generate_srt_handler(job, processor) async

Generates raw SRT from a profile video/audio file and stores it

job

Expects a file_id and an optional GenerateSrtRequest-shaped opts in the job's params

Parameters:

Name Type Description Default
job JobDTO

The single-op job to run

required
processor Processor

The transcription orchestrator

required

Returns:

Type Description
SrtResult

The new SRT file's id, media URL, and content

transcribe_handler(job, processor) async

Transcribes a profile audio file to joined text and stores a transcript

job

Expects a file_id and an optional TranscribeAudioRequest-shaped opts in the job's params

Parameters:

Name Type Description Default
job JobDTO

The single-op job to run

required
processor Processor

The transcription orchestrator

required

Returns:

Type Description
TranscribeResult

The new transcript's id and text