Search code examples
phpmultithreadingpcntl

Best way to offload one-shot worker threads in PHP? pthreads? fcntl?


How should I multithread some php-cli code that needs a timeout?

I'm using PHP 5.6 on Centos 6.6 from the command line.

I'm not very familiar with multithreading terminology or code. I'll simplify the code here but it is 100% representative of what I want to do.

The non-threaded code currently looks something like this:

$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
    MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity

I need to prefetch all my datasets, and each processRawDataAndStoreResultInDB() cannot fetch it's own dataset. Sometimes processRawDataAndStoreResultInDB() takes too long to process a dataset, so I want to limit the amount of time it has to process it.

So you can see that making it multithreaded would

  1. Speed it up by allowing multiple processRawDataAndStoreResultInDB() to execute at the same time
  2. Use set_time_limit() to limit the amount of time each one has to process each dataset

Notice that I don't need to come back to my main program. Since this is a simplification, you can trust that I don't want to collect all the processed datasets and do a single save into the DB after they are all done.

I'd like to do something like:

class MyWorkerThread extends SomeThreadType {
  public function __construct($timeout, $dataset) {
    $this->timeout = $timeout;
    $this->dataset = $dataset;
  }

  public function run() {
    set_time_limit($this->timeout);
    MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
  } 
}

$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();

$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
  $thread = new MyWorkerThread($timeoutForEachThread, $dataset);

  $thread->addCallbackOnTerminated(function() {
    if ($this->isTimeout()) {
      MyLibrary::saveBadDatasetToDb($dataset);
    }
  }

  $pool->addToQueue($thread);
}

$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity

From my research online I've found the PHP extension pthreads which I can use with my thread-safe php CLI, or I could use the PCNTL extension or a wrapper library around it (say, Arara/Process)

When I look at them and their examples though (especially the pthreads pool example) I get confused quickly by the terminology and which classes I should use to achieve the kind of multithreading I'm looking for.

I even wouldn't mind creating the pool class myself, if I had a isRunning(), isTerminated(), getTerminationStatus() and execute() function on a thread class, as it would be a simple queue.

Can someone with more experience please direct me to which library, classes and functions I should be using to map to my example above? Am I taking the wrong approach completely?

Thanks in advance.


Solution

  • Here comes an example using worker processes. I'm using the pcntl extension.

    /**
     * Spawns a worker process and returns it pid or -1 
     * if something goes wrong.
     *
     * @param callback function, closure or method to call
     * @return integer
     */
    function worker($callback) {
        $pid = pcntl_fork();
        if($pid === 0) {
            // Child process
            exit($callback());
        } else {
            // Main process or an error
            return $pid;
        }
    }
    
    
    $datasets = array(
        array('test', '123'),
        array('foo', 'bar')
    );
    
    $maxWorkers = 1;
    $numWorkers = 0;
    foreach($datasets as $dataset) {
        $pid = worker(function () use ($dataset) {
            // Do DB stuff here
            var_dump($dataset);
            return 0;
        });
    
        if($pid !== -1) {
            $numWorkers++;
        } else {
            // Handle fork errors here
            echo 'Failed to spawn worker';
        }
    
        // If $maxWorkers is reached we need to wait
        // for at least one child to return
        if($numWorkers === $maxWorkers) {
            // $status is passed by reference
            $pid = pcntl_wait($status);
            echo "child process $pid returned $status\n";
            $numWorkers--;
        }
    }
    
    // (Non blocking) wait for the remaining childs
    while(true) {
        // $status is passed by reference
        $pid = pcntl_wait($status, WNOHANG);
    
        if(is_null($pid) || $pid === -1) {
            break;
        }
    
        if($pid === 0) {
            // Be patient ...
            usleep(50000);
            continue;
        }
    
        echo "child process $pid returned $status\n";
    }