Search code examples
phpzeromqratchet

Lost messages in PUSH/PULL pattern ( Ratchet + PHP + ZeroMQ push integration )


I'm creating a chat on my website, a push notifications system, users activity widget ( updating on the fly ) etc.

My website is built on PHP, so I decided to use Ratchet as a websocket server for my tasks. I've installed all required components and I learned the guide on http://socketo.me/docs/push and started to code.

This is inside a ChatMsg( $item ){...} method in the model.php file. It creates a PUSH socket-access-point archetype and sends a message with JSON-data to server via ZeroMQ after inserting a new item in a database:

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
$socket->connect("tcp://localhost:5555");
$socket->send(json_encode($sData));

Next is my push-server.php, creating only one PULL socket access-point archetype and waiting for new messages, which will be transferred to the pusher script, broadcasting new notifications, chat messages and other events to clients.

<?php 
require dirname(__DIR__) . '/vendor/autoload.php';

    $loop   = React\EventLoop\Factory::create();
    $pusher = new MyApp\Pusher;

    // Listen for the web server to make a ZeroMQ push after an ajax request
    $context = new React\ZMQ\Context($loop);
    $pull = $context->getSocket(ZMQ::SOCKET_PULL);
    $pull->setSockOpt(ZMQ::SOCKOPT_HWM, 0);
    $pull->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself
    $pull->on('error', function ($e) {
        $f = fopen('push-server-error.log', "a");
        fwrite($f, $e->getMessage()."\n");
        fclose($f);
    });
    $pull->on('message', array($pusher, 'onNewEvent'));

    // Set up our WebSocket server for clients wanting real-time updates
    $webSock = new React\Socket\Server($loop);
    $webSock->listen(8081, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
    $webServer = new Ratchet\Server\IoServer(
        new Ratchet\Http\HttpServer(
            new Ratchet\WebSocket\WsServer(
                new Ratchet\Wamp\WampServer(
                    $pusher
                )
            )
        ),
        $webSock
    );

    $loop->run();
?>

I successfully started the push-server.php with a monitoring tool Supervisor, I set up an NGINX proxying for WebSocket traffic, set up client side scripts ( autobahn and so on ).

In general, I was going to use it all at production. First hours I modified new chat systems on my website, I tested it and it all worked perfectly.

But I faced the problem a little bit later. Some ZeroMQ messages ( only part of them, maybe 5-10% ) are lost after sending via ZeroMQ PUSH socket. At that, this problem appears when around 300-400 messages are sent since the moment the push-server.php process was started.

I am deeply convinced that this problem is inside the ZeroMQ ( NOT inside JS client side or Pusher script with business logic ) because I tried to modify the "->on(){...}" method in the push-server.php so as to display new messages on terminal ( console ) and lost messages even does not get displayed on console, i.e. the "->on(){...}" method does not catch up them.

ZeroMQ "->send()" method always returns an empty ZeroMQ socket object, when a message was successfully sent or LOST. I checked this simply by sending chat messages on my website and by getting responses ( form submitting realized with AJAX ):

var_dump($socket->send(json_encode($sData)));

What there can be this problem and how to solve it?

Server OS:      CentOS 6.9 (Final)
PHP version:           5.6.31
ZMQ extension version: 1.1.3
libzmq version:        4.2.2

Solution

  • I created not persistent ZMQ::Context and ZMQ::Socket and my problem was solved:

    $context = new ZMQContext(1, false);
    $socket = $context->getSocket(ZMQ::SOCKET_PUSH);