How supervisor opens queues to run
- Tram Ho
Introduce
Have you ever wondered, how does laravel queue really work, recently had the opportunity to learn so I also want to introduce it to everyone in detail
Group to run queue:work
WorkCommand
Is the class that will register the command to run for laravel
Worker
1 processing unit, where a processing loop is created, separating the job from the queue for processing (can be understood as a CPU process)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | /** * Listen to the given queue in a loop. * * @param string $connectionName * @param string $queue * @param \Illuminate\Queue\WorkerOptions $options * @return int */ public function daemon($connectionName, $queue, WorkerOptions $options) { if ($this->supportsAsyncSignals()) { $this->listenForSignals(); } $lastRestart = $this->getTimestampOfLastQueueRestart(); [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; while (true) { // Before reserving any jobs, we will make sure this queue is not paused and // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $status = $this->pauseWorker($options, $lastRestart); if (! is_null($status)) { return $this->stop($status); } continue; } // First, we will attempt to get the next job off of the queue. We will also // register the timeout handler and reset the alarm for this job so it is // not stuck in a frozen state forever. Then, we can fire off this job. $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); if ($this->supportsAsyncSignals()) { $this->registerTimeoutHandler($job, $options); } // If the daemon should run (not in maintenance mode, etc.), then we can run // fire off this job for processing. Otherwise, we will need to sleep the // worker so no more jobs are processed until they should be processed. if ($job) { $jobsProcessed++; $this->runJob($job, $connectionName, $options); if ($options->rest > 0) { $this->sleep($options->rest); } } else { $this->sleep($options->sleep); } if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); } // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. $status = $this->stopIfNecessary( $options, $lastRestart, $startTime, $jobsProcessed, $job ); if (! is_null($status)) { return $this->stop($status); } } } |
- In worker create a continuous loop
- If the queue is paused then stop the job
- Get the next job from the database job (can be database, redis, …)
- If there is a job, we will process it
- After processing the job, sleep for a period of time, then proceed to run the loop again to get the job to run again
Queue
Where to create load define job, payload, event, connection setting to store queue
1 2 3 4 5 6 7 8 9 10 | - createPayload - createObjectPayload - getDisplayName - getJobBackoff - getJobExpiration - createStringPayload - pushOn - laterOn - bulk |
Job
The smallest unit containing the work that needs to be processed, will be pushed to the queue and processed gradually
1 2 3 4 5 6 7 8 | abstract class Job public function uuid() public function fire() public function delete() public function isDeleted() public function release($delay = 0) protected function failed($e) |
Queue’s group:listen
Usually used to run commands for local development, like queue:work above but this command helps to listen for source code changes to reload the queue to apply our new code each time we edit.
ListenCommand
Where to define command for queue:listen
class Listener
The place to run the process for the command queue:listen, as we can see, is the same as above, it runs in 1 round while (true)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | /** * Listen to the given queue connection. * * @param string $connection * @param string $queue * @param \Illuminate\Queue\ListenerOptions $options * @return void */ public function listen($connection, $queue, ListenerOptions $options) { $process = $this->makeProcess($connection, $queue, $options); while (true) { $this->runProcess($process, $options->memory); } } |
How supervisor opens queues to run
Using a supervisor, we can open an arbitrary number of wokers, suitable for business requirements and server configuration, usually for a mid-range server I think opening 8 => 10 woker is enough