Search code examples
phprabbitmqphp-amqplib

Writing a subscribe method of RabbitMQ in PHP


I have a function as defined below:

public function subscribe($someQueue)
{
    $callback = function($msg){
        return $msg->body;
    };
    $this->channel->basic_consume( $someQueue, '', FALSE, TRUE, FALSE, FALSE, $callback);
    while(count($this->channel->callbacks)) {
         $this->channel->wait();
    }
}

I'm using the following function:

Note: Following lines are in a different class file, hence creating object of the class that contains the above function.

$objRMQ = new RabbitMQ();
$msgBody = $objRMQ->subscribe("someQueue");
echo "message body returned from someMethod: ".$msgBody; 

Basically, I want to return body of every message to the caller function that is published to the queue.

Current output:

message body returned from subscribe: NULL

Expected output:

holla, this is your message from queue

Solution

  • Since this question is old but still unanswered, I'll give a brief explanation. You've probably already figured out the answer by now, but this might help someone else searching in future.

    The key concept here is "asynchronous execution".

    When you subscribe to a channel using the basic_consume method, you are not asking for the callback to be executed once, immediately, but for it to be executed once a message becomes available, and then every time another message is available.

    In the case of AMQPLib, you wait for new messages by repeatedly calling the wait() method; i.e. here:

    while(count($this->channel->callbacks)) {
         $this->channel->wait();
    }
    

    Thinking about this carefully, there are two mistakes in your code:

    • The line return $msg->body has nowhere to return to. The call will happen somewhere deep in the implementation of the wait() method, and you get no output from $this->channel->wait(), so have no way of doing anything with that returned value.
    • On the other side, when you call $objRMQ->subscribe("someQueue") from your other class, you are expecting it to return something, but that function has no return statement. The only return statement is inside the anonymous function you passed to basic_consume.

    The solution is basically to do all your processing of the message - echo $msg->body, or whatever real processing you want to do - inside the callback. If you really want to gather data as messages come in, you could save it to some variable accessible outside the callback, but remember that you will at some point need to break out of the wait() loop in order to do anything with that data.