Search code examples
phpsocketsstreamnetwork-programmingstreaming

How to listen muliple TCP streams continously by stream_get_contents()?


By Default stream_get_contents waits and listen for 60 seconds. if i have multiple streams and want to listen all of them continuously.

Inside foreach if i am listening one stream, i can't listen other ones.

What is the solution to listen and capture stream output for all streams continuously?

while(true){
   //$streamArray is an array of streams obtained by stream_socket_client("tcp://..);
   foreach($streamArray as $stream){
        fputs($stream,$command);
        stream_get_contents($stream);  // and update file/DB (Blocking call)
        }
    }

Note: For every stream i have already done stream_set_blocking( $stream , true );

Update:

My requirement is to listen all streams for some time say 30 mints. at the same time i can't listen 2 stream. if i have 5 streams, my code is just time division multiplexing, out of 30 mints each individual stream will be recorded only for 6 mints

I have one solution that make AJAX request for individual stream and record independently. Certainly i don't want to do this multiple AJAX call method as it will result in more code as well as more CPU.


Solution

  • with stream_set_blocking($resource, true) you start reading the stream(s) synchronously, which means each call to fread() waits until there is data to read. You then invoke stream_get_contents(), which reads from the blocking stream until it reaches EOF (is closed). The result is that you read one stream after another and not "simultaneously".

    Reading streams this way is common and the easiest to code, but when you want to handle multiple streams simultaneously, you must handle the buffers, timing and "end of stream" detection yourself. With your current code this part is abstracted away from you via blocking streams and stream_get_contents().

    To read multiple streams simultaneously, you should rewrite your code to read the stream(s) asynchroneously.

    // $streamArray is an array of streams obtained by stream_socket_client("..");
    
    $buffers = array();
    $num_streams = 0;
    foreach($streamArray as $stream) {
        // set streams non-blocking for asynchroneous reading
        stream_set_blocking($stream, false);
        // write command to all streams - multiplexing
        fputs($stream, $command);
        // initialize buffers
        $buffers[$stream] = '';
        $num_streams++;
    }
    while($num_streams > 0) {
        // use stream_select() to wait for new data and not use CPU at 100%.
        // note that the stream arrays are passed by reference and are modified
        // by stream_select() to reflect which streams are modified / have a
        // new event. Because of this we use a copy of the original stream array.
        // also note that due to a limitation in ZE you can not pass NULL directly
        // for more info: read the manual    https://php.net/stream_select
        $no_stream = NULL;
        $select_read = $streamArray;
        stream_select($select_read, $no_stream, $no_stream, null);
    
        // if there is new data, read into the buffer(s)
        foreach($select_read as $stream) {
            $buffers[$stream] .= fread($stream, 4096);
            // check if the stream reached end
            if (feof($stream)) {
                $key = array_search($stream, $streamArray);
                // close stream properly
                $num_streams--;
                fclose($stream);
                // remove stream from array of open streams
                unset($streamArray[$key]);
            }
        }
    }
    // do something with your buffers - our use them inside the while() loop already
    print_r($buffers);