Search code examples
phpratchetphpwebsocketproc-open

Ratchet messages not being sent to socket client


I am developing a terminal on web app for personal use and using Ratchet for the socket server. And It is working fine with simple commands like ls etc. But when I run commands like ping it just doesn't send the content of stdout and stderr to the socket client However even in these commands I am able to see that commands live output in terminal. Here is my implementation

<?php

use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;

// ini_set('display_errors', 1);
// ini_set('display_startup_errors', 1);
// error_reporting(E_ALL);

require 'vendor/autoload.php'; // Ensure you have Ratchet installed

class TerminalServer implements MessageComponentInterface {
    protected $clients;
    private array $open_processes;

    public function __construct() {
        $this->open_processes = [];
        $this->clients = new \SplObjectStorage;
    }

    public function onOpen(ConnectionInterface $conn) {
        echo "New connection\n";
        $this->clients->attach($conn);
    }

    static function get_output_json( string $message , array $opts = []){
        return json_encode( [ 'data' => $message , 'opts' => $opts ] );
    }

    private function command_exec(ConnectionInterface &$from , string $command){
        
        $descriptorspec = [
            0 => ["pipe", "r"],  // stdin
            1 => ["pipe", "w"],  // stdout
            2 => ["pipe", "w"],  // stderr
        ];
    
        $process = proc_open($command, $descriptorspec, $pipes);
        $this->open_processes[] = $process;
    
        if (is_resource($process)) {

            // Close the input pipe, since we're not sending any input
            fclose($pipes[0]);
    
            // Set the pipes to non-blocking mode
            stream_set_blocking($pipes[1], false);
            stream_set_blocking($pipes[2], false);
    
            while (true) {

                $output = stream_get_contents($pipes[1]);
                $error = stream_get_contents($pipes[2]);
    
                if ($output) {

                    $string_to_send = $this->get_output_json($output);
                    echo "Out: $string_to_send\n";
                    $from->send($string_to_send);

                }
                
                if ($error) {

                    $string_to_send = $this->get_output_json($error);
                    echo "Err: $string_to_send\n";
                    $from->send($string_to_send);

                }

                flush();

                // Check if the process is still running                
                $status = proc_get_status($process);
                if (!$status['running']) {
                    break;
                }
    
                // Small delay to prevent busy-waiting
                usleep(100000); // 100ms
            }
    
            // Close pipes and process
            fclose($pipes[1]);
            fclose($pipes[2]);
            $return_value = $this->command_end($process);

            $from->send($this->get_output_json("Command ended with code $return_value\n", ['exit' => true]));

        } else {

            $from->send($this->get_output_json("Command failed to execute", ['exit' => true]));

        }

    }    

    /**end or terminate a command and return the exit code */
    public static function command_end( $proc ) :int|false {

        if( !is_resource($proc) ){
            return false;
        }

        $status = proc_get_status( $proc );

        if( $status['running'] === true ) {
            proc_terminate($proc , SIGINT);
            return SIGINT;
        } else {
            return proc_close($proc);
        }

    }

    public function onMessage(ConnectionInterface $from, $msg) {
        $command = trim($msg);
        
//        $allowedCommands = ['ls -l', 'pwd', 'whoami'];
    
        if (true || in_array($command, $allowedCommands)) {
            $this->command_exec($from , $command);
        } else {            
            $from->send($this->get_output_json("Command not allowed.\r\n"));
            
        }

    }

    public function onClose(ConnectionInterface $conn) {

        echo "Connection closed\n";

        foreach ($this->open_processes as $p) {
            var_dump($p);
            proc_close($p);
        }
        $this->clients->detach($conn);

    }

    public function onError(ConnectionInterface $conn, \Exception $e) {
        echo "Connection closed because error\n";
        $conn->close();
    }
}

$app = new Ratchet\App('localhost', 2222);
$app->route('/terminal', new TerminalServer, ['*']);
echo "No errors so far\n";
$app->run();

I have tried making the command exit after reading 10 lines by changing the command_exec function and it sends the output all at once after it is exited

private function command_exec(ConnectionInterface &$from , string $command){
        
        $descriptorspec = [
            0 => ["pipe", "r"],  // stdin
            1 => ["pipe", "w"],  // stdout
            2 => ["pipe", "w"],  // stderr
        ];
    
        $process = proc_open($command, $descriptorspec, $pipes);
        $this->open_processes[] = $process;
    
        if (is_resource($process)) {

            // Close the input pipe, since we're not sending any input
            fclose($pipes[0]);
    
            // Set the pipes to non-blocking mode
            stream_set_blocking($pipes[1], false);
            stream_set_blocking($pipes[2], false);

            $i = 0;//count line read

            while (true) {

                $output = stream_get_contents($pipes[1]);
                $error = stream_get_contents($pipes[2]);
    
                if ($output) {

                    $string_to_send = $this->get_output_json($output);
                    echo "Out: $string_to_send\n";
                    $from->send($string_to_send);
                    $i++;

                    if( $i > 10 ){
                        break;
                    }

                }
                
                if ($error) {

                    $string_to_send = $this->get_output_json($error);
                    echo "Err: $string_to_send\n";
                    $from->send($string_to_send);

                }

                flush();

                // Check if the process is still running                
                $status = proc_get_status($process);
                if (!$status['running']) {
                    break;
                }
    
                // Small delay to prevent busy-waiting
                usleep(100000); // 100ms
            }
    
            // Close pipes and process
            fclose($pipes[1]);
            fclose($pipes[2]);
            $return_value = $this->command_end($process);

            $from->send($this->get_output_json("Command ended with code $return_value\n", ['exit' => true]));

        } else {

            $from->send($this->get_output_json("Command failed to execute", ['exit' => true]));

        }

    }

Solution

  • I was able to find what exactly was the problem about , here

    So it was clear that the $from->send($data) method was waiting for ping command to be executed and It has to be done in a non blocking way as shown in the same page

    So I changed the command_exec like following

    
    private function command_exec(ConnectionInterface &$from, string $command){
    
            $descriptorspec = [
                0 => ["pipe", "r"],  // stdin
                1 => ["pipe", "w"],  // stdout
                2 => ["pipe", "w"],  // stderr
            ];
    
            $process = proc_open($command, $descriptorspec, $pipes);
    
            println("Created process");
    
            $this->add_to_open_processes( $from , $process );
            
            if (is_resource($process)) {
    
                fclose($pipes[0]);
               
    
                $stdout_promise = new Deferred();
                $stderr_promise = new Deferred();
    
                $command_end_callback = function () use ($from, $pipes, $process) {
    
                    println("all promise resolved");
    
                    array_map( 'fclose' , array_filter( [$pipes[1] , $pipes[2]] , 'is_resource') );
                    
                    $return_value = $this->command_end($process);
                    
                    $this->send_ln( $from , "Command ended with code $return_value");
    
                };
    
                $main_promise = \React\Promise\all([$stdout_promise->promise() , $stderr_promise->promise()]);
                $main_promise->then($command_end_callback);
    
    
                if (is_resource($pipes[1]) && 'stream' === get_resource_type($pipes[1])) {
    
                    println("stdout is stream");
    
                    $stdout_stream = new Stream($pipes[1]);                
    
                    $this->add_to_open_streams( $from , $stdout_stream);
                    
                    $stdout_stream->on('data', function($data) use ($from){
    
                        $string_to_send = $this->get_output_json($data);
                        println("Out: $string_to_send");
                        $from->send($string_to_send);
    
                    });
    
                    $stdout_stream->on('end', function() use( $stdout_promise ) {
    
                        $stdout_promise->resolve(true);            
                        
                    });
    
                } else {
                    $stdout_promise->resolve(true);            
                }
    
                if (is_resource($pipes[2]) && 'stream' === get_resource_type($pipes[2])) {
    
                    println("stderr is stream");
                    $stderr_stream = new Stream($pipes[2]);
                    
                    $this->add_to_open_streams( $from , $stderr_stream);
                    
                    $stderr_stream->on('data', function($data) use ($from){
                        $string_to_send = $this->get_output_json($data);
                        println("Err: $string_to_send");
                        $from->send($string_to_send);
                    });
    
                    $stderr_stream->on('end', function() use( $stderr_promise ) {
    
                        $stderr_promise->resolve(true);
                        
                    });
    
                } else {
                    $stderr_promise->resolve(true);
                }
    
            }
        }   
    

    I have used \React\Promise\all to end the process of command after STDOUT and STDERR has ended.