Search code examples
phprabbitmqamqpphp-amqplib

Multiple types of messaging queues in php with php-amqplib and rabbitmq


So, I'm trying to create a simple app that would do 4 things.

1) get a list of consumers ( if I can get those that registered .. maybe I can name them somehow when they join so it's dynamic ).

2) send a "message" to one random consumer and display the result

3) send a "message" to one specific consumer ( got from the list above or maybe a predefined list ) and display the result

4) send a "message" to all consumers and display the results as they come from each one.

The app is done in php with php-amqplib (https://github.com/videlalvaro/php-amqplib) . The rabbitmq is up and running and seems to work ( tried the tutorials ).

The documentation for the amqp lib for me is a bit weird so what I'd greatly appreciate would some lines of example code and description of the params used.


Solution

  • 1) can be solved by using rabbitmq's management /api/queues by naming each consumer with a persistent queue name for itself.

    2,3,4, got solved like this:

    <?php
    //Producer Config
    $host = "remote_host";
    $port = 5672;
    $user = "user";
    $pass = "pass";
    $array = array("consumer1", "consumer2");
    

    ...

    <?php
    //Producer
    
    if(!isset($argv[1]) || !isset($argv[2])){
            die("Specify a target and a message\n");
    }
    
    require_once __DIR__.'/config.php';
    require_once __DIR__.'/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPSSLConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $corr_id = uniqid();
    
    $connection = new AMQPSSLConnection($host, $port, $user, $pass, "/", array("verify_peer" => false));
    $channel = $connection->channel();
    
    $response = null;
    $onResponse = function ($rep) {
            global $response;
            $response = $rep->body;
            echo " [>] Received: '".$response."'\n";
    };
    
    list($callback_queue, ,) = $channel->queue_declare("", false, false, true, false);
    $channel->basic_consume($callback_queue, '', false, false, false, false, $onResponse);
    $msg = new AMQPMessage($argv[2], array('correlation_id' => uniqid(), 'reply_to' => $callback_queue));
    
    switch($argv[1]){
            case "random":
                    $dest = $array[array_rand($array)];
                    $type = "direct";
                    break;
            case "all":
                    $dest = "to_all";
                    $type = "fanout";
                    break;
            case $argv[1]:
                    $dest = $argv[1];
                    $type = "direct";
                    break;
    }
    
    $channel->exchange_declare($dest, $type, false, false, false);
    $channel->basic_publish($msg, $dest);
    echo " [<] Sent '".$argv[2]."' to '".$dest."'\n";
    
    try {
            if($dest == "to_all"){
                    $replies = 0;
                    while(!$response || $replies < count($consumers_array)){
                            $channel->wait(null, false, $timeout);
                            $replies++;
                    }
            }else{
                    while(!$response){
                            $channel->wait(null, false, $timeout);
                    }
            }
    }catch(PhpAmqpLib\Exception\AMQPTimeoutException $e){
            echo " [x] AMQPTimeoutException thrown\n";
    }
    
    $channel->close();
    $connection->close();
    

    ...

    <?php
    //Consumer config
    $host = "remote_host";
    $port = 5672;
    $user = "user";
    $pass = "pass";
    $consumer_name = "consumerX";
    

    ...

    <?php
    //Consumer
    require_once __DIR__ . '/config.php';
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPSSLConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPSSLConnection($host, $port, $user, $pass, "/", array('verify_peer' => false));
    $channel = $connection->channel();
    
    $channel->exchange_declare('to_all', 'fanout', false, false, false);
    $channel->exchange_declare($consumer_name, 'direct', false, false, false);
    $channel->queue_declare($consumer_name, false, false, true, false);
    $channel->queue_bind($consumer_name, 'to_all');
    $channel->queue_bind($consumer_name, $consumer_name);
    echo '[*] Waiting for messages. To exit press CTRL+C', "\n";
    
    $callback = function($msg) {
            echo " [>] Received: '".$msg->body."'\n";
            $rand = rand(1,3);
            sleep($rand);
            $reply = uniqid()." - slept ".$rand;
            echo " [<] Replied: '".$reply."'\n";
            $raspuns = new AMQPMessage($reply);
            $msg->delivery_info['channel']->basic_publish($raspuns,'',$msg->get('reply_to'));
    };
    
    $channel->basic_consume($consumer_name, '', false, true, false, false, $callback);
    
    while(count($channel->callbacks)) {
            $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    

    Not sure this is the best way but it got my "hello world" going.