Search code examples
phpshellcakephpcronqueue

Stop concurrent php jobs jobs run via cron job


I'm using Queuesadilla job queue plugin for CakePhp to manage several heavy duty tasks as data extraction from excel and so. I run the shell command with a cron job with an interval. However, if there are more than one job listed in the queue; shell command starts the worker on the next job without waiting the previous one's ending or failure. Since I want to run shell command with a very small interval as 5seconds and some of the jobs take 3 mins to run completely; I want worker to wait for the previous job to complete and not start till then.

However when I run a cronjob with bin/cake CustomQueuesadilla; obviously it starts with a new PID and so on without knowing if the previous one still running. When I set my worker's runtime as 20seconds and set the cron job interval as 3 minutes; I have a job working every 3 minutes. But this also means if there are no jobs, it will wait 3 minutes - 20 seconds to run again. I want to reduce this time window.

I'm open to architectural changes and different libraries as well.


Solution

  • Queue vs Cron

    What you're trying to implement is a looping queue. Cron is only intended for scheduled events.

    1. Build your queue manager/worker
      • This worker calls the other commands.
      • Such calls should be wrapped in try/catch, as well as checking for exit codes.
      • You should probably log (time, command [, error]) of every call.
      • Build it as a singleton.
    2. Add a cron entry to call a command which gets the Manager (singleton).
      • As a result of singleton pattern, if it stopped, it will start again.
      • Unless you store the state of the queue outside of that Manager (file or db), it may always die at cmd4 for example, and never run subsequent commands.

    Manager ex

    class CommandQueueLoop {
        const QUEUE = [
            'app:command1' => [
                'command' => 'app:command1',
            ],
            'app:command2' => [
                'command' => 'app:command2',
                'somearg' => 123
            ],
            'app:command3' => [
                'command' => 'app:command3',
            ],
        ];
    
        private static $queueManager;
    
        private $activeQueue;
        private $app;
        private $input;
        private $output;
        private $running;
    
        public function __construct(Application $app, InputInterface $input, OutputInterface $output)
        {
            $this->app = $app;
            $this->input = $input;
            $this->output = $output;
            $this->running = false;
            $this->start();
        }
    
        public static function getInstance(Application $app, InputInterface $input, OutputInterface $output)
        {
            if (!self::$queueManager instanceof self) {
                self::$queueManager = new self();
            }
        }
    
        public function start(array $queue = null)
        {
            $this->activeQueue = $queue ?? self::QUEUE;
            $this->running = true;
            $this->run($this->activeQueue);
        }
    
        public function stop()
        {
            $this->running = false;
            $this->activeQueue = null;
        }
    
        private function run(array $queue)
        {
            if (!$this->running) {
                return;
            }
    
            foreach ($queue as $cmd => $args) {
                $this->call($cmd, $args);
            }
    
            $this->run($this->activeQueue);
        }
    
        private function call(string $command, array $args)
        {
            try {
                $cmd = $this->app->find($command);
                $inp = new ArrayInput($args);
                $returnCode = $cmd->run($inp, $this->output);
            } catch (Exception $e) {
                // log failure
                unset($this->activeQueue[$cmd]); // remove from queue
            }
    
            // check returnCode & log
        }
    }
    

    Cron Command to ensure Queue is Running

    Add this command, and only this command to the cron, once an hour or so. When this runs, if the command has stopped, it will start again.

    class QueueLoopCommand extends Command
    {
        ...
    
        public function execute(InputInterface $input, OutputInterface $output)
        {
            $queueLoop = CommandQueueLoop::getInstance($this->getApplication(), $input, $output);
        }
    }
    

    Here be dragons

    This "manager" needs to be written very carefully, with good logging, and an emergency stop of some kind. Additionally, that Cron Command should be calling host of other methods in the "manager" to check status, last runs, double-check for multiple threads, etc., and do something about unexpected states. Building tools like these get deep fast, and can lead you down rabbit-holes like you wouldn't believe.

    Major dangers:

    • Memory leaks
    • Multiple conflicting threads
    • Unintended Race Conditions