Worker
class Worker (View source)
Traits
Constants
EXIT_SUCCESS |
|
EXIT_ERROR |
|
EXIT_MEMORY_LIMIT |
|
Properties
protected string | $name | The name of the worker. |
|
protected Factory | $manager | The queue manager instance. |
|
protected Dispatcher | $events | The event dispatcher instance. |
|
protected Repository | $cache | The cache repository implementation. |
|
protected ExceptionHandler | $exceptions | The exception handler instance. |
|
protected callable | $isDownForMaintenance | The callback used to determine if the application is in maintenance mode. |
|
protected callable | $resetScope | The callback used to reset the application's scope. |
|
bool | $shouldQuit | Indicates if the worker should exit. |
|
bool | $paused | Indicates if the worker is paused. |
|
static protected callable[] | $popCallbacks | The callbacks used to pop jobs from queues. |
Methods
Determine if the given exception was caused by a lost connection.
Create a new queue worker.
Listen to the given queue in a loop.
Reset the worker timeout handler.
Determine if the daemon should process on this iteration.
Determine the exit code to stop the process if necessary.
Process the next job on the queue.
Process the given job from the queue.
Handle an exception that occurred while the job was running.
Mark the given job as failed if it has exceeded the maximum allowed attempts.
Mark the given job as failed if it has exceeded the maximum allowed attempts.
Mark the given job as failed if it has exceeded the maximum allowed attempts.
Mark the given job as failed if it should fail on timeouts.
Raise the exception occurred queue job event.
Determine if the queue worker should restart.
Get the last queue restart timestamp, or null.
Enable async signals for the process.
Determine if "async" signals are supported.
Determine if the memory limit has been exceeded.
Stop listening and bail out of the script.
Sleep the script for a given number of seconds.
Set the name of the worker.
Register a callback to be executed to pick jobs.
Get the queue manager instance.
Details
protected bool
causedByLostConnection(Throwable $e)
Determine if the given exception was caused by a lost connection.
void
__construct(Factory $manager, Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance, callable $resetScope = null)
Create a new queue worker.
int
daemon(string $connectionName, string $queue, WorkerOptions $options)
Listen to the given queue in a loop.
protected void
registerTimeoutHandler(Job|null $job, WorkerOptions $options)
Register the worker timeout handler.
protected void
resetTimeoutHandler()
Reset the worker timeout handler.
protected int
timeoutForJob(Job|null $job, WorkerOptions $options)
Get the appropriate timeout for the given job.
protected bool
daemonShouldRun(WorkerOptions $options, string $connectionName, string $queue)
Determine if the daemon should process on this iteration.
protected int|null
pauseWorker(WorkerOptions $options, int $lastRestart)
Pause the worker for the current loop.
protected int|null
stopIfNecessary(WorkerOptions $options, int $lastRestart, int $startTime = 0, int $jobsProcessed = 0, mixed $job = null)
Determine the exit code to stop the process if necessary.
void
runNextJob(string $connectionName, string $queue, WorkerOptions $options)
Process the next job on the queue.
protected Job|null
getNextJob(Queue $connection, string $queue)
Get the next job from the queue connection.
protected void
runJob(Job $job, string $connectionName, WorkerOptions $options)
Process the given job.
protected void
stopWorkerIfLostConnection(Throwable $e)
Stop the worker if we have lost connection to a database.
void
process(string $connectionName, Job $job, WorkerOptions $options)
Process the given job from the queue.
protected void
handleJobException(string $connectionName, Job $job, WorkerOptions $options, Throwable $e)
Handle an exception that occurred while the job was running.
protected void
markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, Job $job, int $maxTries)
Mark the given job as failed if it has exceeded the maximum allowed attempts.
This will likely be because the job previously exceeded a timeout.
protected void
markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, Job $job, int $maxTries, Throwable $e)
Mark the given job as failed if it has exceeded the maximum allowed attempts.
protected void
markJobAsFailedIfWillExceedMaxExceptions(string $connectionName, Job $job, Throwable $e)
Mark the given job as failed if it has exceeded the maximum allowed attempts.
protected void
markJobAsFailedIfItShouldFailOnTimeout(string $connectionName, Job $job, Throwable $e)
Mark the given job as failed if it should fail on timeouts.
protected void
failJob(Job $job, Throwable $e)
Mark the given job as failed and raise the relevant event.
protected int
calculateBackoff(Job $job, WorkerOptions $options)
Calculate the backoff for the given job.
protected void
raiseBeforeJobEvent(string $connectionName, Job $job)
Raise the before queue job event.
protected void
raiseAfterJobEvent(string $connectionName, Job $job)
Raise the after queue job event.
protected void
raiseExceptionOccurredJobEvent(string $connectionName, Job $job, Throwable $e)
Raise the exception occurred queue job event.
protected bool
queueShouldRestart(int|null $lastRestart)
Determine if the queue worker should restart.
protected int|null
getTimestampOfLastQueueRestart()
Get the last queue restart timestamp, or null.
protected void
listenForSignals()
Enable async signals for the process.
protected bool
supportsAsyncSignals()
Determine if "async" signals are supported.
bool
memoryExceeded(int $memoryLimit)
Determine if the memory limit has been exceeded.
int
stop(int $status = 0, WorkerOptions|null $options = null)
Stop listening and bail out of the script.
never
kill(int $status = 0, WorkerOptions|null $options = null)
Kill the process.
protected MaxAttemptsExceededException
maxAttemptsExceededException(Job $job)
Create an instance of MaxAttemptsExceededException.
void
sleep(int|float $seconds)
Sleep the script for a given number of seconds.
$this
setCache(Repository $cache)
Set the cache repository implementation.
$this
setName(string $name)
Set the name of the worker.
static void
popUsing(string $workerName, callable $callback)
Register a callback to be executed to pick jobs.
Factory
getManager()
Get the queue manager instance.
void
setManager(Factory $manager)
Set the queue manager instance.