Jobs
jobs
¶
Defines classes and functions to manage the server's asynchronous job queue, which handles the execution of long-running operations
Long-Running Operations
- Transcription
- SRT Generation
- Media Conversion
- LLM SRT-Fixing
How It Works
-
Jobs are queued in-process and run by a single worker coroutine that is initialised on app startup and persists throughout its lifecycle
-
Each job references an
existingprofile file and persists its output back as profile files or transcripts, which is what lets the client operate on server-side media without re-uploading it
HANDLERS = {'generate_srt': generate_srt_handler, 'transcribe': transcribe_handler, 'convert': convert_handler, 'fix_srt': fix_srt_handler, 'batch_generate_srt': batch_generate_srt_handler, 'batch_transcribe': batch_transcribe_handler, 'batch_convert': batch_convert_handler, 'batch_fix_srt': batch_fix_srt_handler}
module-attribute
¶
Dictionary mapping all currently available JobHandler functions to the type
attribute of the database-persisted job that they execute
JobQueueManager
¶
Usage
Allows the client to submit long-running operation requests and tracks
their statuses across navigation through the jobs database table
Durability
-
The queue is in-process and does not survive a restart
-
On shutdown,
stopattempts to fail any job still left running
- On startup,
startfails any job still leftrunningbecause of a server crash and re-queues anyqueuedjob
Concurrency
-
One worker runs jobs sequentially, so 2 or more heavy, local GPU/CPU operations never run simultaneously
-
Cross-file parallelism for batches is the job handler's concern, not the worker's
Job Handlers
-
A
Job Handleris a function that runs an operation for a job with a specifictypeattribute and returns its results (e.g File references, strings, ...) in an awaitable coroutine -
Handlers are keyed by the job
typethat they execute in an instace-scoped dictionary -
Handlers can be registered to an instance by using the
register_handlermethod, which accepts both the handler function and a string representing the specific jobtypewhich the handler executes -
The
_run_jobfunction queries the database for the providedjob_id , looks up itstypeattribute and routes execution to the designated registered handling, failing jobs whosetype` doesn't match any registered handler
Attributes:
| Name | Type | Description |
|---|---|---|
queue |
Queue[UUID] | None
|
The in-process job queue,
created in |
_worker_task |
Task[None] | None
|
The running worker coroutine
, or |
processor |
Processor
|
The running app's lifecycle-scoped |
handlers |
dict[str, _JobHandler]
|
A dictionary mapping registered
|
running_job_id
property
¶
The id of the job the worker is currently executing, or None
Returns:
| Type | Description |
|---|---|
UUID | None
|
The in-flight job id, or |
register_handler(job_type, handler)
¶
Register a new Job Handler to the instance
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_type
|
str
|
The |
required |
handler
|
_JobHandler
|
The |
required |
start()
async
¶
Starts the worker and rebuilds the asynchronous queue on server
restarts from the jobs presisted in the database on previous runs,
requeuing any queued jobs
stop()
async
¶
Stops the worker coroutine, cancelling any in-flight wait and marking running jobs as failed
submit_job(job_id)
async
¶
Adds an already persisted job to the queue for the worker to run
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job_id
|
UUID
|
The id of the job to enqueue |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the instance's queue hasn't been initialised with
|
batch_convert_handler(job, processor)
async
¶
Runs a batch of convert operations, one child job per file
The execution strategy is backend-specific. Modal fans the files across GPU containers, while the local backend runs them one at a time
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The batch parent job (its children hold the file ids) |
required |
processor
|
Processor
|
The conversion orchestrator |
required |
Returns:
| Type | Description |
|---|---|
BatchResult
|
The batch outcome counts and child ids |
batch_fix_srt_handler(job, processor)
async
¶
Runs a batch of fix_srt operations, one child job per file
LLM SRT-fixing is provider-agnostic and I/O-bound, so the files run through
the bounded-concurrency runner on every backend, capped by
MAX_LLM_CONCURRENCY
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The batch parent job (its children hold the file ids) |
required |
processor
|
Processor
|
Unused (the LLM layer is provider-agnostic) |
required |
Returns:
| Type | Description |
|---|---|
BatchResult
|
The batch outcome counts and child ids |
batch_generate_srt_handler(job, processor)
async
¶
Runs a batch of generate_srt operations, one child job per file
The execution strategy is backend-specific. Modal fans the files across containers, while the local backend runs them one at a time
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The batch parent job (its children hold the file ids) |
required |
processor
|
Processor
|
The transcription orchestrator |
required |
Returns:
| Type | Description |
|---|---|
BatchResult
|
The batch outcome counts and child ids |
batch_transcribe_handler(job, processor)
async
¶
Runs a batch of transcribe operations, one child job per file
The execution strategy is backend-specific. Modal fans the files across containers, while the local backend runs them one at a time
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The batch parent job (its children hold the file ids) |
required |
processor
|
Processor
|
The transcription orchestrator |
required |
Returns:
| Type | Description |
|---|---|
BatchResult
|
The batch outcome counts and child ids |
convert_handler(job, processor)
async
¶
Converts a profile video file to MP4 and stores the result
job
Expects a file_id and an optional ConvertVideoRequest-shaped opts
in the job's params
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The single-op job to run |
required |
processor
|
Processor
|
The conversion orchestrator |
required |
Returns:
| Type | Description |
|---|---|
ConvertResult
|
The new MP4 file's id and media URL |
fix_srt_handler(job, processor)
async
¶
Refines a profile SRT file with an LLM and stores the cleaned result
job
Expects a file_id, a model selector, and an optional sys_msg in
the job's params
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The single-op job to run |
required |
processor
|
Processor
|
Unused (the LLM layer is provider-agnostic) |
required |
Returns:
| Type | Description |
|---|---|
SrtResult
|
The new SRT file's id, media URL, and content |
generate_srt_handler(job, processor)
async
¶
Generates raw SRT from a profile video/audio file and stores it
job
Expects a file_id and an optional GenerateSrtRequest-shaped opts
in the job's params
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The single-op job to run |
required |
processor
|
Processor
|
The transcription orchestrator |
required |
Returns:
| Type | Description |
|---|---|
SrtResult
|
The new SRT file's id, media URL, and content |
transcribe_handler(job, processor)
async
¶
Transcribes a profile audio file to joined text and stores a transcript
job
Expects a file_id and an optional TranscribeAudioRequest-shaped
opts in the job's params
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
job
|
JobDTO
|
The single-op job to run |
required |
processor
|
Processor
|
The transcription orchestrator |
required |
Returns:
| Type | Description |
|---|---|
TranscribeResult
|
The new transcript's id and text |