Search code examples
streamingreactphp

Time sleep in reponse Stream with ReactPHP


I'm playing with ReactPHP and response streaming.

I've succefully created a POC to generate a response stream like this:

function (int $chunks, int $sleep) use ($loop) {
    $stream = new ThroughStream();
    $loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
        static $i = 0;
        $stream->write(microtime(true) . PHP_EOL);
        $i++;
        if ($i >= $chunks) {
            $loop->cancelTimer($timer);
            $stream->end();
        }
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

Calling curl -X GET -i https://localhost:9091/sleepstream/10/1 will produced

1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363

With each line printed 1 second after the previous one. nice.

Now I'm trying to create a more realistic controller:

function (int $sleep, ServerRequestInterface $request) use ($loop) {
    $body = $request->getBody();
    assert($body instanceof \React\Stream\ReadableStreamInterface);
                    
    $in = new \Clue\React\NDJson\Decoder($body);
                    
    $stream = new ThroughStream();

    $in->on('data', function ($data) use ($stream, $loop) {
        $data->ts = time();
        $loop->futureTick(function () use ($stream, $data) {
            echo "DATA\n";
            $stream->write(\json_encode($data) . PHP_EOL);
            sleep(1);
        });
    });
                    
    $in->on('end', function() use ($stream, $loop) {
        $loop->addTimer(2, function () use ($stream) {
            $stream->end();
        });
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

I'm using an NDJSON input file users.ndjson:

{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}

So calling that:

curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1

gives that:

{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}

But the response is not recieved line by line each second but in one shot...

The server logs is displaying:

DATA
DATA
DATA
DATA
DATA

with the good timing (1 for each second).

I can't understand why response streaming is broken in the second controller.

EDIT: I'm starting understand why response streaming is broken in the second controller.

@WyriHaximus pointed out the the sleep() is blocking the loop. So my question is now: How can I yield to the loop that a record is ready to be streamed and emulate a sleep (I'm trying to emulate computation lag between input stream and output stream to validate some key point).


Solution

  • ReactPHP core maintainer here. The sleep(1); you have in your code blocks the event loop for a full second. And due to the event loop can't write the data out because it gets blocked when trying. Also, if you add the time() call for debugging, you might want to use microtime(true) to visualize the time between write queueing better. Unless I'm missing something and you have a very good reason, you generally don't want to delay writing out data you get after processing it, as it will take up memory you don't need to reserve.