Search code examples
phprabbitmqworkerphp-amqp

rabbitmq and php - Process multiple queues with one worker (broker)


I have 1000 queues with specific names. so I want to process these queues with one broker. is it possible?

the queue names is stored in mysql db so I should fetch theme and run the broker for each one. and of course it should run asynchronously and should be able to pass the queued item to a idle broker. is this possible? or I should make 1000 files with specific queue names as brokers?

Update: this is a picture of my queues. the queues should run in a parallel manner not a serial one. so the users are producer and the worker is consumer that runs the send_message() method;

enter image description here


Solution

  • I can show you how to it with enqueue library. I must warn you, there is no way to consume messages asynchronously in one process. Though you can run a few processes that serve a set of queues. They could be divided into groups by the queue importance.

    Install the AMQP transport and consumption library:

    composer require enqueue/amqp-ext enqueue/enqueue
    

    Create a consumption script. I assume that you have an array of queue names already fetched from DB. They are stored in $queueNames var. The example bound the same processor to all queues but you can set different ones, of course.

    <?php
    
    use Enqueue\AmqpExt\AmqpConnectionFactory;
    use Enqueue\Consumption\QueueConsumer;
    use Enqueue\Psr\PsrMessage;
    use Enqueue\Psr\PsrProcessor;
    
    // here's the list of queue names which you fetched from DB
    $queueNames = ['foo_queue', 'bar_queue', 'baz_queue'];
    
    $factory = new AmqpConnectionFactory('amqp://');
    
    $context = $factory->createContext();
    
    // create queues at RabbitMQ side, you can remove it if you do not need it
    foreach ($queueNames as $queueName) {
        $queue = $context->createQueue($queueName);
        $queue->addFlag(AMQP_DURABLE);
    
        $context->declareQueue($queue);
    }
    
    $consumer = new QueueConsumer($context);
    
    foreach ($queueNames as $queueName) {
        $consumer->bind($queueName, function(PsrMessage $psrMessage) use ($queueName) {
            echo 'Consume the message from queue: '.$queueName;
    
            // your processing logic.
    
            return PsrProcessor::ACK;
        });
    }
    
    $consumer->consume();
    

    More in doc