QueueService
in package
Durable Queue Service for AtoM Heratio.
Provides dispatch, reserve, batch, chain, progress, and retry operations for background job processing. Generalizes the pattern from ahgAIPlugin's JobQueueService into a framework-level service available to all plugins.
Uses MySQL-backed queues with SELECT ... FOR UPDATE SKIP LOCKED for reliable worker reservation without external broker dependencies.
Tags
Table of Contents
Constants
- BACKOFF_EXPONENTIAL = 'exponential'
- BACKOFF_LINEAR = 'linear'
- BACKOFF_NONE = 'none'
- BATCH_CANCELLED = 'cancelled'
- BATCH_COMPLETED = 'completed'
- BATCH_FAILED = 'failed'
- BATCH_PAUSED = 'paused'
- BATCH_PENDING = 'pending'
- BATCH_RUNNING = 'running'
- STATUS_BADGES = ['pending' => 'secondary', 'reserved' => 'info', 'running' => 'primary', 'completed' => 'success', 'failed' => 'danger', 'cancelled' => 'warning', 'paused' => 'warning']
- STATUS_CANCELLED = 'cancelled'
- STATUS_COMPLETED = 'completed'
- STATUS_FAILED = 'failed'
- STATUS_PENDING = 'pending'
- STATUS_RESERVED = 'reserved'
- STATUS_RUNNING = 'running'
Methods
- addToBatch() : int
- Add jobs to a batch.
- cancelBatch() : bool
- Cancel a batch — cancels all pending/reserved jobs.
- cancelJob() : bool
- Cancel a single job.
- checkRateLimit() : bool
- Check if a rate limit group has capacity.
- cleanup() : int
- Cleanup old completed/cancelled jobs and logs.
- createBatch() : int
- Create a batch.
- dispatch() : int
- Dispatch a job to the queue.
- dispatchChain() : int
- Dispatch a chain of jobs (sequential execution).
- dispatchSync() : array<string|int, mixed>
- Dispatch and execute a job synchronously (no queue, immediate).
- flushFailed() : int
- Flush (delete) all failed jobs.
- getActiveWorkers() : array<string|int, mixed>
- Get active workers.
- getBatch() : object|null
- Get a single batch by ID.
- getBatchProgress() : array<string|int, mixed>
- Get batch progress.
- getFailedJobs() : array<string|int, mixed>
- Get failed jobs.
- getJob() : object|null
- Get a single job by ID.
- getLogEvents() : array<string|int, mixed>
- Get log events.
- getProgress() : array<string|int, mixed>
- Get job progress.
- getRecentJobs() : array<string|int, mixed>
- Get recent jobs with optional filters.
- getStats() : array<string|int, mixed>
- Get queue statistics.
- logEvent() : void
- Log a queue event.
- markCompleted() : void
- Mark a job as completed.
- markFailed() : void
- Mark a job as failed.
- markRunning() : void
- Mark a job as running.
- pauseBatch() : bool
- Pause a running batch — prevents new jobs from being picked up.
- queueNames() : array<string|int, mixed>
- Get human-readable queue names.
- recordRateUse() : void
- Record a rate limit usage.
- recoverStale() : int
- Recover stale jobs (reserved/running too long without heartbeat).
- reserveNext() : object|null
- Reserve the next available job for a worker.
- retryAllFailed() : int
- Retry all failed jobs.
- retryFailed() : int|null
- Retry a specific failed job by moving it from failed table back to queue.
- startBatch() : bool
- Start a batch — makes pending jobs available for workers.
- statusBadge() : string
- Get a Bootstrap badge class for a status.
- updateProgress() : void
- Update job progress.
Constants
BACKOFF_EXPONENTIAL
public
mixed
BACKOFF_EXPONENTIAL
= 'exponential'
BACKOFF_LINEAR
public
mixed
BACKOFF_LINEAR
= 'linear'
BACKOFF_NONE
public
mixed
BACKOFF_NONE
= 'none'
BATCH_CANCELLED
public
mixed
BATCH_CANCELLED
= 'cancelled'
BATCH_COMPLETED
public
mixed
BATCH_COMPLETED
= 'completed'
BATCH_FAILED
public
mixed
BATCH_FAILED
= 'failed'
BATCH_PAUSED
public
mixed
BATCH_PAUSED
= 'paused'
BATCH_PENDING
public
mixed
BATCH_PENDING
= 'pending'
BATCH_RUNNING
public
mixed
BATCH_RUNNING
= 'running'
STATUS_BADGES
public
mixed
STATUS_BADGES
= ['pending' => 'secondary', 'reserved' => 'info', 'running' => 'primary', 'completed' => 'success', 'failed' => 'danger', 'cancelled' => 'warning', 'paused' => 'warning']
STATUS_CANCELLED
public
mixed
STATUS_CANCELLED
= 'cancelled'
STATUS_COMPLETED
public
mixed
STATUS_COMPLETED
= 'completed'
STATUS_FAILED
public
mixed
STATUS_FAILED
= 'failed'
STATUS_PENDING
public
mixed
STATUS_PENDING
= 'pending'
STATUS_RESERVED
public
mixed
STATUS_RESERVED
= 'reserved'
STATUS_RUNNING
public
mixed
STATUS_RUNNING
= 'running'
Methods
addToBatch()
Add jobs to a batch.
public
addToBatch(int $batchId, array<string|int, mixed> $jobs) : int
Parameters
- $batchId : int
- $jobs : array<string|int, mixed>
-
Array of ['job_type' => string, 'payload' => array, ...]
Return values
int —Number of jobs added
cancelBatch()
Cancel a batch — cancels all pending/reserved jobs.
public
cancelBatch(int $batchId) : bool
Parameters
- $batchId : int
Return values
boolcancelJob()
Cancel a single job.
public
cancelJob(int $jobId) : bool
Parameters
- $jobId : int
Return values
boolcheckRateLimit()
Check if a rate limit group has capacity.
public
checkRateLimit(string $group) : bool
Parameters
- $group : string
Return values
boolcleanup()
Cleanup old completed/cancelled jobs and logs.
public
cleanup([int $days = 30 ]) : int
Parameters
- $days : int = 30
-
Delete items older than this many days
Return values
int —Total rows deleted
createBatch()
Create a batch.
public
createBatch(array<string|int, mixed> $data) : int
Parameters
- $data : array<string|int, mixed>
Return values
int —Batch ID
dispatch()
Dispatch a job to the queue.
public
dispatch(string $jobType[, array<string|int, mixed> $payload = [] ][, string $queue = 'default' ][, int $priority = 5 ][, int $delaySeconds = 0 ][, int $maxAttempts = 0 ][, int|null $userId = null ][, string|null $rateLimitGroup = null ]) : int
Parameters
- $jobType : string
-
Handler identifier (e.g. 'ingest:commit')
- $payload : array<string|int, mixed> = []
-
Job-specific arguments
- $queue : string = 'default'
-
Queue name (default, ai, ingest, export, sync)
- $priority : int = 5
-
1=highest, 9=lowest
- $delaySeconds : int = 0
-
Delay before first attempt
- $maxAttempts : int = 0
-
Maximum retry attempts
- $userId : int|null = null
-
Dispatching user
- $rateLimitGroup : string|null = null
-
Rate limiter group
Return values
int —Job ID
dispatchChain()
Dispatch a chain of jobs (sequential execution).
public
dispatchChain(array<string|int, mixed> $jobs[, string $queue = 'default' ][, int|null $userId = null ]) : int
Each job in the chain runs only after the previous one completes. If a job fails after max retries, subsequent chain jobs are cancelled.
Parameters
- $jobs : array<string|int, mixed>
-
Array of ['job_type' => string, 'payload' => array, ...]
- $queue : string = 'default'
-
Queue name
- $userId : int|null = null
Return values
int —Chain ID (matches first job's chain_id)
dispatchSync()
Dispatch and execute a job synchronously (no queue, immediate).
public
dispatchSync(string $jobType[, array<string|int, mixed> $payload = [] ]) : array<string|int, mixed>
Parameters
- $jobType : string
- $payload : array<string|int, mixed> = []
Return values
array<string|int, mixed> —Result data from the handler
flushFailed()
Flush (delete) all failed jobs.
public
flushFailed() : int
Return values
int —Number deleted
getActiveWorkers()
Get active workers.
public
getActiveWorkers() : array<string|int, mixed>
Return values
array<string|int, mixed>getBatch()
Get a single batch by ID.
public
getBatch(int $id) : object|null
Parameters
- $id : int
Return values
object|nullgetBatchProgress()
Get batch progress.
public
getBatchProgress(int $batchId) : array<string|int, mixed>
Parameters
- $batchId : int
Return values
array<string|int, mixed>getFailedJobs()
Get failed jobs.
public
getFailedJobs([int $limit = 50 ][, int $page = 1 ]) : array<string|int, mixed>
Parameters
- $limit : int = 50
- $page : int = 1
Return values
array<string|int, mixed>getJob()
Get a single job by ID.
public
getJob(int $id) : object|null
Parameters
- $id : int
Return values
object|nullgetLogEvents()
Get log events.
public
getLogEvents([int|null $jobId = null ][, int|null $batchId = null ][, int $limit = 50 ]) : array<string|int, mixed>
Parameters
- $jobId : int|null = null
- $batchId : int|null = null
- $limit : int = 50
Return values
array<string|int, mixed>getProgress()
Get job progress.
public
getProgress(int $jobId) : array<string|int, mixed>
Parameters
- $jobId : int
Return values
array<string|int, mixed>getRecentJobs()
Get recent jobs with optional filters.
public
getRecentJobs([array<string|int, mixed> $filters = [] ][, int $limit = 25 ][, int $page = 1 ]) : array<string|int, mixed>
Parameters
- $filters : array<string|int, mixed> = []
- $limit : int = 25
- $page : int = 1
Return values
array<string|int, mixed>getStats()
Get queue statistics.
public
getStats([string|null $queue = null ]) : array<string|int, mixed>
Parameters
- $queue : string|null = null
Return values
array<string|int, mixed>logEvent()
Log a queue event.
public
logEvent(int|null $jobId, int|null $batchId, string $eventType, string $message[, array<string|int, mixed> $details = [] ]) : void
Parameters
- $jobId : int|null
- $batchId : int|null
- $eventType : string
- $message : string
- $details : array<string|int, mixed> = []
markCompleted()
Mark a job as completed.
public
markCompleted(int $jobId[, array<string|int, mixed> $resultData = [] ][, int $processingTimeMs = 0 ]) : void
Also handles chain advancement and batch progress updates.
Parameters
- $jobId : int
- $resultData : array<string|int, mixed> = []
- $processingTimeMs : int = 0
markFailed()
Mark a job as failed.
public
markFailed(int $jobId, string $errorMessage[, mixed $errorCode = null ][, string|null $errorTrace = null ]) : void
If retries remain, reschedules with backoff. Otherwise moves to failed table.
Parameters
- $jobId : int
- $errorMessage : string
- $errorCode : mixed = null
- $errorTrace : string|null = null
markRunning()
Mark a job as running.
public
markRunning(int $jobId, string $workerId) : void
Parameters
- $jobId : int
- $workerId : string
pauseBatch()
Pause a running batch — prevents new jobs from being picked up.
public
pauseBatch(int $batchId) : bool
Parameters
- $batchId : int
Return values
boolqueueNames()
Get human-readable queue names.
public
static queueNames() : array<string|int, mixed>
Return values
array<string|int, mixed>recordRateUse()
Record a rate limit usage.
public
recordRateUse(string $group) : void
Parameters
- $group : string
recoverStale()
Recover stale jobs (reserved/running too long without heartbeat).
public
recoverStale([int $timeoutMinutes = 10 ]) : int
Parameters
- $timeoutMinutes : int = 10
-
Jobs reserved/running longer than this are considered stale
Return values
int —Number of recovered jobs
reserveNext()
Reserve the next available job for a worker.
public
reserveNext(string|array<string|int, mixed> $queues, string $workerId) : object|null
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent reservation.
Parameters
- $queues : string|array<string|int, mixed>
-
Queue name(s) to poll
- $workerId : string
-
Worker process identifier
Return values
object|null —The reserved job row, or null if none available
retryAllFailed()
Retry all failed jobs.
public
retryAllFailed() : int
Return values
int —Number of jobs retried
retryFailed()
Retry a specific failed job by moving it from failed table back to queue.
public
retryFailed(int $failedId) : int|null
Parameters
- $failedId : int
Return values
int|nullstartBatch()
Start a batch — makes pending jobs available for workers.
public
startBatch(int $batchId) : bool
Parameters
- $batchId : int
Return values
boolstatusBadge()
Get a Bootstrap badge class for a status.
public
static statusBadge(string $status) : string
Parameters
- $status : string
Return values
stringupdateProgress()
Update job progress.
public
updateProgress(int $jobId, int $current, int $total[, string $message = '' ]) : void
Parameters
- $jobId : int
- $current : int
- $total : int
- $message : string = ''