Search code examples
phpstreamamphp

Streaming results in amphp


My goal is to create a pipeline that could have tens of thousands of items in the iterator. Since this can take a long time to process, I want to steam the results back to the client as they are produced.

This is not a web server, it is communication between two systems that I control.

I think I need to create a future for each item, with a closure that writes to the stream. That seems... big. I am still fiddling with that, and actually Pipeline::tap() might do the job.

My biggest problem, though, is how to establish the stream. It seemed like I should be able to return a Response to the client with a WriteableStream for the body, and continue to write to the stream until all the items are processed.

However, even on the server, the Response only accepts a ReadableStream as a body. To get a WriteableStream it seems like I need to use sockets, but by doing that I lose all the useful abstractions of Request and Response. I guess I'm unbound by all http requirements when using sockets, so I could make my own abstractions, but I'd rather not.

http-server is built on sockets, so it seems like this should be possible.

Any example of streaming the results of a pipeline back to the client would be super-welcome!

Here is the code I have at the moment. The stream is passed in from the request handler, in the futile hope that it can be returned with the Response before the pipeline completes. this iteration of my code uses tap(), which probably isn't how it's intended to be leveraged:

    $pipeline = Pipeline::fromIterable($bigList)
        ->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
        ->unordered()
        ->map(fn (ListItem $item) => $this->getResultForItem($item))
        ->filter(fn (?Result $r) => $r instanceof Result);

    // if there's a stream, don't do anything here that will block until the pipeline is finished.
    if (!is_null($stream)) {
        $streamStarted = false;

        $pipeline->tap(function (Result $r) use ($stream, &$streamStarted) {

            $stream->write($streamStarted ? ',' : '[');
            $streamStarted = true;
            $stream->write(json_encode($r));

        });

        // something here to send the `end()` message when the pipeline has been completed for all items
        //$future = async()

    } else {
        foreach($pipeline as $profile) {
            $indexed[$profile->dsid] = $profile;
        }
    }

If I could add the WriteableStream to the response, I'd be in good shape, I think.


Solution

  • It doesn't appear that there is a way to open a stream and write to it as I imagined above. Instead, a ReadableStream can be created that will do the chore automagically.

    $pipeline = Pipeline::fromIterable($bigList)
        ->concurrent(Router::MAX_CONCURRENT_PROCESSES_PER_REQUEST)
        ->unordered()
        ->map(fn (ListItem $item) => $this->getResultForItem($item))
        ->filter(fn (?Result $r) => $r instanceof Result);
        ->map(fn(Result $r) => json_encode($r));
    
    $stream = new ReadableIterableStream($pipeline->getIterator());
    

    That will write each Result object to the stream as it becomes available. The ReadableStream is returned in the http response, and is updated whenever the pipeline completes for an item in $bigList.

    The client then does something like:

    while($text = $response->getBody()->read()) {
        $response[] = json_decode($text);
    }