Search code examples
phpguzzleamphpartax

How to make Amphp pool/queue of multiple requests? And where is Curl handler?


There is an example/test code made with the use of GuzzleHttp:

use GuzzleHttp\Client;
use GuzzleHttp\Handler\CurlHandler;
use GuzzleHttp\HandlerStack;
use GuzzleHttp\Middleware;
use GuzzleHttp\Pool;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/vendor/autoload.php';

$handler = new CurlHandler();

$stack = new HandlerStack($handler);
$stack->push(Middleware::httpErrors(), 'http_errors');
$stack->push(Middleware::redirect(), 'allow_redirects');
$stack->push(Middleware::cookies(), 'cookies');
$stack->push(Middleware::prepareBody(), 'prepare_body');
$interval = 100;
$concurrency = 50;
$client = new Client(['handler' => $stack]);
echo sprintf("Using Guzzle handler %s\n", get_class($handler));
echo sprintf("Printing memory usage every %d requests\n", $interval);
echo "Fetching package list... ";

$packageNames = json_decode(
    $client->get('https://packagist.org/packages/list.json')
           ->getBody()
           ->getContents()
)->packageNames;

if (empty($packageNames)) {
    echo "Empty result. No reason to continue.";
    return;
}

echo 'done. (' . count($packageNames) . " packages)\n\n";

$requests = function($packageNames) {
    foreach ($packageNames as $packageVendorPair) {
        yield new GuzzleHttp\Psr7\Request('GET', "https://packagist.org/p/{$packageVendorPair}.json");
    }
};

$pool = new Pool($client, $requests($packageNames), [
    'concurrency' => $concurrency,
    'fulfilled' => function (ResponseInterface $response, $index) use (&$counter, $interval) {
        $counter++;
        if ($counter % $interval === 0) {
            echo sprintf(
                "Processed %s requests. Memory used: %s MB\n",
                number_format($counter),
                number_format(memory_get_peak_usage()/1024/1024, 3)
            );
        }
    },
    'rejected' => function($reason, $index) use (&$counter, $interval)
    {
        $counter++;
        if ($counter % $interval === 0) {
            echo sprintf(
                'Processed %s requests. Memory used: %s MB',
                number_format($counter),
                number_format(memory_get_peak_usage()/1024/1024, 3)
            );
        }
    }
]);

$promise = $pool->promise();
$response = $promise->wait();

How to make something similar for Amphp or Artax? I searched over the amp docs and stackoverflow, but couldn't find anything similar.

Btw, I've also found that Amp doesn't use Curl as a handler. Don't understand why there's no such an option available. Can you manually add it or there is something even better, what replaced curl functionality (various custom headers, debug/verbose possibilities and etc)?

The specific points where I need help:

  1. Is it possible that someone can show me where can I find pool equivalent example made with the use Amp framework or any of it's libraries and/or just show it even in more simple example?
  2. Where is Curl handler in Amp? Can I use it and how?

On Amphp website is said:

The Stack Overflow Community can answer your question if it's generic enough. Use the amphp tag so the right people find your question.

Since I provided simple enough (and working) example I thought it will be easy to understand exactly what I need.

With all due respect.


Solution

  • There's no pool equivalent, but it can be written using a semaphore and async coroutines.

    <?php
    
    use Amp\Artax\DefaultClient;
    use Amp\Loop;
    use Amp\Sync\LocalSemaphore;
    
    require __DIR__ . "/vendor/autoload.php";
    
    Loop::run(function () {
        $concurrency = 10;
        $client = new DefaultClient;
        $semaphore = new LocalSemaphore(10);
    
        $packageResponse = yield $client->request("https://packagist.org/packages/list.json");
        $packageNames = json_decode(yield $packageResponse->getBody())->packageNames;
    
        $requestHandler = Amp\coroutine(function ($package) use ($client) {
            $url = "https://packagist.org/p/{$package}.json";
    
            $response = yield $client->request($url);
            $body = yield $response->getBody();
    
            return $body;
        });
    
        $counter = 0;
    
        foreach ($packageNames as $package) {
            $lock = yield $semaphore->acquire();
    
            $promise = $requestHandler($package);
            $promise->onResolve(function ($error, $body) use (&$counter, $lock) {
                $lock->release();
    
                if (++$counter % 50 === 0) {
                    echo sprintf(
                        "Processed %s requests. Memory used: %s MB\n",
                        number_format($counter),
                        number_format(memory_get_peak_usage()/1024/1024, 3)
                    );
                }
            });
        }
    });
    

    This examples uses a LocalSemaphore implementation, which is an implementation of Amp\Sync\Semaphore. The semaphore is used to limit the concurrency.

    There's no Curl handler in Amp, because it doesn't work well with event loops. Curl has its own event loop, but that only allows multiple concurrent HTTP requests, no other non-blocking I/O. That's why Artax implements HTTP based on raw PHP sockets without any dependency on Curl.