How should I multithread some php-cli code that needs a timeout?
I'm using PHP 5.6 on Centos 6.6 from the command line.
I'm not very familiar with multithreading terminology or code. I'll simplify the code here but it is 100% representative of what I want to do.
The non-threaded code currently looks something like this:
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
foreach ($datasets as $dataset) {
MyLibrary::processRawDataAndStoreResultInDB($dataset);
}
exit; // just for clarity
I need to prefetch all my datasets, and each processRawDataAndStoreResultInDB() cannot fetch it's own dataset. Sometimes processRawDataAndStoreResultInDB() takes too long to process a dataset, so I want to limit the amount of time it has to process it.
So you can see that making it multithreaded would
Notice that I don't need to come back to my main program. Since this is a simplification, you can trust that I don't want to collect all the processed datasets and do a single save into the DB after they are all done.
I'd like to do something like:
class MyWorkerThread extends SomeThreadType {
public function __construct($timeout, $dataset) {
$this->timeout = $timeout;
$this->dataset = $dataset;
}
public function run() {
set_time_limit($this->timeout);
MyLibrary::processRawDataAndStoreResultInDB($this->dataset);
}
}
$numberOfThreads = 4;
$pool = somePoolClass($numberOfThreads);
$pool->start();
$datasets = MyLibrary::getAllRawDataFromDBasArrays();
$timeoutForEachThread = 5; // seconds
foreach ($datasets as $dataset) {
$thread = new MyWorkerThread($timeoutForEachThread, $dataset);
$thread->addCallbackOnTerminated(function() {
if ($this->isTimeout()) {
MyLibrary::saveBadDatasetToDb($dataset);
}
}
$pool->addToQueue($thread);
}
$pool->waitUntilAllWorkersAreFinished();
exit; // for clarity
From my research online I've found the PHP extension pthreads which I can use with my thread-safe php CLI, or I could use the PCNTL extension or a wrapper library around it (say, Arara/Process)
When I look at them and their examples though (especially the pthreads pool example) I get confused quickly by the terminology and which classes I should use to achieve the kind of multithreading I'm looking for.
I even wouldn't mind creating the pool class myself, if I had a isRunning(), isTerminated(), getTerminationStatus() and execute() function on a thread class, as it would be a simple queue.
Can someone with more experience please direct me to which library, classes and functions I should be using to map to my example above? Am I taking the wrong approach completely?
Thanks in advance.
Here comes an example using worker processes. I'm using the pcntl extension.
/**
* Spawns a worker process and returns it pid or -1
* if something goes wrong.
*
* @param callback function, closure or method to call
* @return integer
*/
function worker($callback) {
$pid = pcntl_fork();
if($pid === 0) {
// Child process
exit($callback());
} else {
// Main process or an error
return $pid;
}
}
$datasets = array(
array('test', '123'),
array('foo', 'bar')
);
$maxWorkers = 1;
$numWorkers = 0;
foreach($datasets as $dataset) {
$pid = worker(function () use ($dataset) {
// Do DB stuff here
var_dump($dataset);
return 0;
});
if($pid !== -1) {
$numWorkers++;
} else {
// Handle fork errors here
echo 'Failed to spawn worker';
}
// If $maxWorkers is reached we need to wait
// for at least one child to return
if($numWorkers === $maxWorkers) {
// $status is passed by reference
$pid = pcntl_wait($status);
echo "child process $pid returned $status\n";
$numWorkers--;
}
}
// (Non blocking) wait for the remaining childs
while(true) {
// $status is passed by reference
$pid = pcntl_wait($status, WNOHANG);
if(is_null($pid) || $pid === -1) {
break;
}
if($pid === 0) {
// Be patient ...
usleep(50000);
continue;
}
echo "child process $pid returned $status\n";
}