Search code examples
phpzeromqreactphp

React ZMQ, Router to Dealer, not working. ZMQ 4.0.5


I want to do an asynchronous router to dealer messaging with React but it isn't working. The code in http://zguide.zeromq.org/php:rtdealer is working, but I can't identify what I'm doing different. I'm using libzmq 4.0.5

Here is my code:

$context = new React\ZMQ\Context($loop);

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);
$worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
$worker->connect('tcp://127.0.0.1:5556');
$worker->send('END');

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$i = 0;
$loop->addPeriodicTimer(1, function (React\EventLoop\Timer\Timer $timer) use (&$i, $router) {
    echo 'Time to send!'. PHP_EOL;
    $i++;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

The problem is that only the dealer sends the first message "END". After that, router tries to send messages but dealer does not receive them.

Also, this function seems to be only called once:

// \React\ZMQ\SocketWrapper
public function handleReadEvent()
{
    $messages = $this->socket->recvmulti(ZMQ::MODE_NOBLOCK);
    echo 'Receiving...';    // Added
    var_dump($messages);    // Added

    if ($messages !== false) {
        if (count($messages) === 1) {
            $this->emit('message', array($messages[0]));
        }

        $this->emit('messages', array($messages));
    }
}

The output is:

Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Time to send!
Time to send!
Time to send!
Time to send!
Time to send!
...

Edit:

Changed the code to bind the router before the dealer connects to it, the problem is still happening:

$loop = React\EventLoop\Factory::create();

$context = new React\ZMQ\Context($loop);

$router = $context->getSocket(\ZMQ::SOCKET_ROUTER);
$router->bind('tcp://127.0.0.1:5556');

$loop->addPeriodicTimer(10, function (React\EventLoop\Timer\Timer $timer) use ($router) {
    echo 'Router sending messages with an interval of 10 seconds'. PHP_EOL;
    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
});

$router->on('messages', function($msg) use ($router) {
    echo 'Router messages'. PHP_EOL;
    var_dump($msg);
});

$router->on('message', function($msg) {
    echo 'Router message'. PHP_EOL;
    var_dump($msg);
});

$worker = $context->getSocket(\ZMQ::SOCKET_DEALER);

$loop->addPeriodicTimer(5, function (React\EventLoop\Timer\Timer $timer) use ($worker) {
    echo 'After 5 seconds from router binding, connect the dealer and send something'. PHP_EOL;
    $worker->setSockOpt(\ZMQ::SOCKOPT_IDENTITY, 'A');
    $worker->connect('tcp://127.0.0.1:5556');
    $worker->send('END');
    $timer->getLoop()->cancelTimer($timer);     // Cancel the timer after connecting
});

$worker->on('error', function ($e) {
    var_dump($e->getMessage());
});

$worker->on('messages', function($msg) use ($worker) {
    echo 'Dealer messages'. PHP_EOL;
    var_dump($msg);
});

$worker->on('message', function($msg) use ($worker) {
    echo 'Dealer message'. PHP_EOL;
    var_dump($msg);
});

$loop->run();

This is the terminal output:

After 5 seconds from router binding, connect the dealer and send something
Receiving...array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router messages
array(2) {
  [0]=>
  string(1) "A"
  [1]=>
  string(3) "END"
}
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds
Router sending messages with an interval of 10 seconds

Solution

  • I've finally solved the problem. Helped by @Jason's comment Not having used PHP/React, are you certain your namespacing is correct? , I looked at the sendmethod called. This method looks like this:

    // \React\ZMQ\SocketWrapper
    
    public function send($message)
    {
        echo 'Inside send, sending message'. PHP_EOL; //This line was added by myself
        $this->buffer->send($message);
    }
    

    Then, I thought: well, if I'm calling this method this way:

    $router->send('A', \ZMQ::MODE_SNDMORE);
    

    The second argument is never passed.

    Then, I saw that the sendmultimethod was not implemented in the SocketWrapperand the __callmethod was implemented in this SocketWrapper class tho call directly the method implemented in the ZMQ api for PHP, so I tried to call sendmultiwrapping this two parameters in an array. It worked, so I tried to call the first sendmethod passing the same array and I don't know why but it worked too, so the trick was calling:

    $router->send(array('A', \ZMQ::MODE_SNDMORE));
    $router->send(array('END'));
    

    Instead of:

    $router->send('A', \ZMQ::MODE_SNDMORE);
    $router->send('END');
    

    Also, be careful with the ZMQ PHP API located in the GitHub repository, it's wrong. PhpStorm downloaded somehow another ZMQ PHP API located somewhere that seems to be correct, the sendmultimethod prototype looks like this in this API:

    public function sendmulti(array $message, $mode = 0)
    

    Surprisingly, the following calls to this method seem to give the same results:

    $router->send(array('A', \ZMQ::MODE_SNDMORE));
    $router->send(array('A'), \ZMQ::MODE_SNDMORE);