Search code examples
phpmultithreadingreactive-programmingreactphp

Using reactive PHP in a blocking application


I'm currently working on a PHP application that will be using some websocket connections to talk to another service.

To talk to this websocket service, we are using Ratchet - which is a PHP library based on react PHP.

This piece of code needs to send and respond to a couple of requests, and after that, should return the information to the "main thread".

Example flow:

HTTP request -> controller -> Starts a service which opens a websocket client -> websocket client is talking to server -> once its done it should return the outcome to the controller code -> controller outputs to user

The issue I'm having is that I'm not familiar with Reactive PHP and am not sure how to handle this.

I've tried;

    $service = new WebsocketService();
    $startTimer = time();
    $service->getList(44);
    while($service->getResponse() == null) {
        usleep(500);
        if (time() > $startTimer + 10) {
            continue; //Timeout on 10 seconds
        }
    }
    var_dump($service->getResponse());

The service code would set its "response" variable to something other than null once its done. This obviously fails, because the sleep method is blocking the thread. Also without, it seems like the while loop is blocking I/O and the reactive code fails.

A solution would be to open up a new thread and run the websocket code there, but I wouldn't be happy with that.

I feel like I need to implement some sort of "watcher" around the websocket process, but I'm not sure how to do that.

Our Websocket service client code looks like this;

private $response = null;

/**
 * @return null|object
 */
public function getResponse() {
    return $this->response;
}

public function getList($accountId) {
    $this->response = null;
    \Ratchet\Client\connect('ws://192.168.56.1:8080')->then(function(\Ratchet\Client\WebSocket $conn) use ($accountId) {
        $login = new \stdClass();
        $login->action = 'login';
        $conn->on('message', function($msg) use ($conn, $login, $accountId) {
            try {
                $response = json_decode($msg);
                if ($response->result_id == 100) {
                    //Succesfully logged in to websocket server

                    //Do our request now.
                    $message = new \stdClass();
                    $message->target = 'test';
                    $conn->send(json_encode($message));
                }

                if (isset($response->reply) && $response->reply == 'list') {
                    $this->response = $response; //This is the content I need returned in the controller
                    $conn->close(); //Dont need it anymore
                }

            } catch (\Exception $e) {
                echo 'response exception!';
                //Do nothing for now
            }
        });

        $conn->send(json_encode($login));
    }, function ($e) {
        echo "Could not connect: {$e->getMessage()}\n";
    });
}

Running the code like this also does not work;

    $service = new WebsocketService();
    $service->getList(44);
    echo 'Test';
    var_dump($service->getResponse());

because the "test" echo comes before I even get a response from the websocket server.

Please, enlighten me! I'm not sure what to search for.


Solution

  • PHP and websockets still seem to be a bit experimental. Nevertheless I have found a great tutorial on medium.com, written by Adam Winnipass which should be really helpful for solving your problem: https://medium.com/@winni4eva/php-websockets-with-ratchet-5e76bacd7548

    The only difference is that they are implementing their websocket client with JavaScript instead of PHP. But in the end there should not be much of a difference, because as soon as we have opened the Websocket connection of each end both applications have to send and also wait to receive notifications - this is how they illustrate it:

    Establishing a Web Socket connection

    Seems like one possibility to create a successful Websocket connection is to extend the MessageComponentInterface

    use Ratchet\MessageComponentInterface;
    

    which also requires

    use Ratchet\ConnectionInterface;
    

    The message component interface defines the following methods:

    • onOpen
    • onMessage
    • onClose
    • onError

    And I think this is how the Ratchet library is implementing it. This is how they are finally starting their server:

    use Ratchet\Server\IoServer;
    use MyApp\MyCustomMessageComponentInterface;
    use Ratchet\Http\HttpServer;
    use Ratchet\WebSocket\WsServer;
    require dirname(__DIR__) . '/vendor/autoload.php';
    $server = IoServer::factory(
                new HttpServer(
                    new WsServer(
                        new MyCustomMessageComponentInterface()
                    )
                ),
              8080
             );
    $server->run();
    

    With this architecture you already can receive (onMessage) and sending is also possible with the send() method.

    I can not solve the exact problem with your existing code. But I guess if you are using the pre-built classes and interfaces of the library as intended (and demonstrated here) you should be able to achieve what you want by adding your code to the corresponding methods.

    More information and examples can be found in the docs:

    http://socketo.me/docs/server

    http://socketo.me/api/namespace-Ratchet.html