Search code examples
phpparallel-processingphp-7php-7.4amphp

AMPHP - Queueing more Tasks than available Workers in Pool


I have a project in which I am converting a large amount of .tif images into PDF documents. File count goes into millions.

To speed up the process I am using Amphp. Since the process of converting the images with Imagemagick takes up some cpu power I want to limit the maximum amount of parallel running converter processes.

My first approach works, but could be improved if I queue the files instead of giving a set amount of workers an array of x files.

This is my current code, where I tried to replicate the example.

<?php
require dirname(__DIR__) . '/vendor/autoload.php';

$constants = get_defined_constants(true);
$constants = $constants['user'];
$maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
$i = 0;
$folder = opendir(LOOKUP_PATH);
$tasks = [];

while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
    $fileParts = explode('.', $import_file);
    $ext = strtolower(end($fileParts));
    if($ext === 'xml') {
        $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
        $tasks[] = new ConvertPdfTask([$filePath], $constants);
    }
    $i++;
}
if(!empty($tasks)) {
    Amp\Loop::run(function () use ($tasks) {
        $coroutines = [];
        $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
        foreach ($tasks as $index => $task) {
            $coroutines[] = Amp\call(function() use ($pool, $task) {
                return yield $pool->enqueue($task);
            });
        }
        $results = yield Amp\Promise\all($coroutines);

        return yield $pool->shutdown();
    });
}

My problem is, that as soon as I enqueue more than the THREAD_COUNT amount of tasks, I get the following PHP warning: Warning: Worker in pool exited unexpectedly with code -1 and no PDFs are created.

As long as I stay below the maximum pool size, everything is fine.

I am using PHP 7.4.9 on Windows 10 and amphp/parallel 1.4.0.


Solution

  • After some more experimenting I found a solution, that seems to work. It feels a bit "hacky", so if anyone has a better idea, please share. I thought the pool would automatically build up a queue which is then handled by the maximum amount of workers, that seems to not be the case.

    I now save the coroutines that I get from the Amp\call in two separate arrays. One which holds all coroutines and one that holds all for the current loop.

    $coroutine = Amp\call(function () use ($pool, $task) {
        return yield $pool->enqueue($task);
    });
    $loopRoutines[] = $coroutine;
    $allCoroutines[] = $coroutine;
    

    After enqueueing an item I check if I already reached the maximum number of configured threads. If the pool has the maximum numbers of workers and no idle worker, I call the Amp\Promise\first function on my current-loop coroutines to wait for a new free idle worker.

    Since the function would instantly return the next time I get there (because the finished coroutine is still im my current-loop array), I clear the array.

    if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) {
        yield Amp\Promise\first($loopRoutines);
        $loopRoutines = [];
    }
    

    After the foreach I call Amp\Promise\all on my all-coroutines array, so the script waits until all workers are finished.

    Here is my changed code:

    <?php
    require dirname(__DIR__) . '/vendor/autoload.php';
    
    $constants = get_defined_constants(true);
    $constants = $constants['user'];
    $maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT;
    $i = 0;
    $folder = opendir(LOOKUP_PATH);
    $tasks = [];
    
    while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) {
        $fileParts = explode('.', $import_file);
        $ext = strtolower(end($fileParts));
        if($ext === 'xml') {
            $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file;
            $tasks[] = new ConvertPdfTask([$filePath], $constants);
        }
        $i++;
    }
    if(!empty($tasks)) {
        Amp\Loop::run(function () use ($tasks) {
            $allCoroutines = [];
            $loopRoutines = [];
            $pool = new Amp\Parallel\Worker\DefaultPool(THREAD_COUNT);
            foreach ($tasks as $index => $task) {
                $coroutine = Amp\call(function () use ($pool, $task) {
                    return yield $pool->enqueue($task);
                });
                $loopRoutines[] = $coroutine;
                $allCoroutines[] = $coroutine;
                if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) {
                    yield Amp\Promise\first($loopRoutines);
                    $loopRoutines = [];
                }
            }
            yield Amp\Promise\all($allCoroutines);
    
            return yield $pool->shutdown();
        });
    }