Search code examples
phpcronlimitpopen

PHP popen process limit?


I'm trying to extract a time consuming task to a separate process. Unfortunately, multi-threading does not really seem to be an option with PHP, but you can create new php processes, using popen.

The use case is this: there is a cronjob that runs every minute, which checks if there are any email campaigns that need to be sent. There could be multiple campaigns that need to be sent at the exact same time, but as of now it just picks up one campaign every minute. I would like to extract sending of the campaigns to a separate process, so that I can send multiple campaigns at the same time.

The code looks something like this (note that this is just a proof of concept):

crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1

maintask.php

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo "Spawn process {$processName}" . PHP_EOL;

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
    stream_set_blocking($process, false);
}

subtask.php

$process = $_GET['process_name'];

echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process  {$process}" . PHP_EOL;

Now, the problem I'm having is that popen will only spawn 2 processes at any time, while I'm trying to spawn 4. I can not figure out why. There doesn't appear to be any limit documented. Perhaps this is limited by the amount of cores I have available?


Solution

  • I modified subtask.php so you can see when each task starts, ends and how long it is intending to wait. now you can see when a process starts/stops you can reduce the sleep times - don't need to use ps -aux to show when processes are running

    subtask.php

    <?php
    $process = $argv[1];
    
    $sleepTime = rand(1, 10);
    echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
    sleep($sleepTime);
    echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;
    

    I've added the Class into the maintask.php code so you can test it... the fun starts when you queue() more entries than you have set maxProcesses (try 32)
    NOTE: the results will come back in the order they complete

    maintask.php

    <?php
    class ParallelProcess
    {
        private $maxProcesses = 16; // maximum processes
        private $arrProcessQueue = [];
        private $arrCommandQueue = [];
    
        private function __construct()
        {
        }
    
        private function __clone()
        {
        }
    
        /**
         *
         * @return \static
         */
        public static function create()
        {
            $result = new static();
            return $result;
        }
    
        /**
         *
         * @param int $maxProcesses
         * @return \static
         */
        public static function load($maxProcesses = 16)
        {
            $result = self::create();
            $result->setMaxProcesses($maxProcesses);
            return $result;
        }
    
        /**
         * get maximum processes
         *
         * @return int
         */
        public function getMaxProcesses()
        {
            return $this->maxProcesses;
        }
    
        /**
         * set maximum processes
         *
         * @param int $maxProcesses
         * @return $this
         */
        public function setMaxProcesses($maxProcesses)
        {
            $this->maxProcesses = $maxProcesses;
            return $this;
        }
    
        /**
         * number of entries in the process queue
         *
         * @return int
         */
        public function processQueueLength()
        {
            $result = count($this->arrProcessQueue);
            return $result;
        }
    
        /**
         * number of entries in the command queue
         *
         * @return int
         */
        public function commandQueueLength()
        {
            $result = count($this->arrCommandQueue);
            return $result;
        }
    
    
        /**
         * process open
         *
         * @staticvar array $arrDescriptorspec
         * @param string $strCommand
         * @return $this
         * @throws \Exception
         */
        private function p_open($strCommand)
        {
            static $arrDescriptorSpec = array(
                0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
                1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
                2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
            );
    
            $arrPipes = array();
            if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
                throw new \Exception("error: proc_open() failed!");
            }
    
            $resStream = &$arrPipes[1];
    
            if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
                throw new \Exception("error: stream_set_blocking() failed!");
            }
    
            $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
            return $this;
        }
    
        /**
         * execute any queued commands
         *
         * @return $this
         */
        private function executeCommand()
        {
            while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
                $strCommand = array_shift($this->arrCommandQueue);
                $this->p_open($strCommand);
            }
            return $this;
        }
    
        /**
         * process close
         *
         * @param array $arrQueueEntry
         * @return $this
         */
        private function p_close(array $arrQueueEntry)
        {
            $resProcess = $arrQueueEntry[1];
            $resStream = $arrQueueEntry[2];
    
            fclose($resStream);
    
            $this->returnValue = proc_close($resProcess);
    
            $this->executeCommand();
            return $this;
        }
    
        /**
         * queue command
         *
         * @param string $strCommand
         * @return $this
         */
        public function queue($strCommand) {
            // put the command on the $arrCommandQueue
            $this->arrCommandQueue[] = $strCommand;
            $this->executeCommand();
            return $this;
        }
    
        /**
         * read from stream
         *
         * @param resource $resStream
         * @return string
         */
        private static function readStream($resStream)
        {
            $result = '';
            while (($line = fgets($resStream)) !== false) {
                $result .= $line;
            }
            return $result;
        }
    
        /**
         * read a result from the process queue
         *
         * @return string|false
         */
        private function readProcessQueue()
        {
            $result = false;
            reset($this->arrProcessQueue);
            while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
                $arrStatus = proc_get_status($arrQueueEntry[1]);
                if ($arrStatus['running'] === false) {
                    array_splice($this->arrProcessQueue, $key, 1);
                    $resStream = $arrQueueEntry[2];
                    $result = self::readStream($resStream);
                    $this->p_close($arrQueueEntry);
                }
            }
            return $result;
        }
    
        /**
         * get result from process queue
         *
         * @return string|false
         */
        public function readNext()
        {
            $result = false;
            if ($this->processQueueLength() === 0) {
            } else {
                while ($result === false and $this->processQueueLength() > 0) {
                    $result = $this->readProcessQueue();
                }
            }
            return $result;
        }
    }
    
    set_time_limit(0); // don't timeout
    
    $objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes
    
    for ($i = 0; $i < 4; $i++) {
        $processName = "Process_{$i}";
        echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
        $objParallelProcess->queue("php subtask.php {$processName}"); // queue process
    }
    
    // loop through process queue
    while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
        // process response
        echo $strResponse;
    }