Search code examples
phplaraveldockeractivemq-classicamqp

How to use php-amqplib to connect to ActiveMQ Classic Docker image


I am trying to create a custom command that should consume messages from an ActiveMQ Classic queue. I am using php-amqplib, and I created my custom connector and ActiveMQServiceProvider. I am getting this error when running sail artisan horizon:consume-activemq

PhpAmqpLib\Exception\AMQPInvalidFrameException 

Invalid frame type 65

...

  7   app/Queue/Connectors/ActiveMQConnector.php:15
      PhpAmqpLib\Connection\AMQPStreamConnection::__construct()

  8   app/Console/Commands/ConsumeActiveMQMessages.php:29
      App\Queue\Connectors\ActiveMQConnector::connect()

I checked in the Docker log and it seems like the php-amqplib does not support AMQP v1.0:

Connection attempt from non AMQP v1.0 client. AMQP,0,0,9,1
2024-02-22 13:24:44  WARN | Transport Connection to: tcp://192.168.65.1:32999 failed: org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from the client using unsupported AMQP attempted

Did I misunderstand or misconfigure anything?

These are my configurations:

queue.php

    'connections' => [

...
        'activemq' => [
            'driver' => 'activemq',
            'host' => env('ACTIVEMQ_HOST', 'localhost'),
            'port' => env('ACTIVEMQ_PORT', 61613),
            'username' => env('ACTIVEMQ_USERNAME', 'guest'),
            'password' => env('ACTIVEMQ_PASSWORD', 'guest'),
            'queue' => env('ACTIVEMQ_QUEUE', ''),
            'exchange_name' => env('ACTIVEMQ_EXCHANGE_NAME', ''),
        ],

mylocal .env

ACTIVEMQ_HOST=host.docker.internal
ACTIVEMQ_PORT=5672
ACTIVEMQ_USER=admin
ACTIVEMQ_PASSWORD=admin
ACTIVEMQ_QUEUE=activemqTest

ActiveMQServiceProvider.php

<?php

namespace App\Providers;

use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;

class ActiveMQServiceProvider extends ServiceProvider
{
    /**
     * Register services.
     */
    public function register(): void
    {
    }

    /**
     * Bootstrap services.
     */
    public function boot(): void
    {
        $this->app->make(QueueManager::class)->addConnector('activemq', function () {
            return new ActiveMQConnector();
        });
    }
}

ActiveMQConnector.php

<?php

namespace App\Queue\Connectors;

use Illuminate\Queue\Connectors\ConnectorInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class ActiveMQConnector implements ConnectorInterface
{
    /**
     * @throws \Exception
     */
    public function connect(array $config)
    {
        return new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['username'],
            $config['password'],
            $config['vhost']
        );
    }
}

ConsumeActiveMQMessages.php

<?php

namespace App\Console\Commands;

use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;

class ConsumeActiveMQMessages extends Command
{
    protected $signature = 'horizon:consume-activemq';

    protected $description = 'Consume messages from ActiveMQ and process them within Horizon';

    /**
     * @throws \Exception
     */
    public function handle()
    {
        $connector = new ActiveMQConnector();
        $config = [
            'host' => config('queue.connections.activemq.host'),
            'port' => config('queue.connections.activemq.port'),
            'username' => config('queue.connections.activemq.username'),
            'password' =>config('queue.connections.activemq.password'),
            'vhost' => config('queue.connections.activemq.vhost') !== null ?config('queue.connections.activemq.vhost') : '/',
        ];

        $connection = $connector->connect($config);

        $channel = $connection->channel();

        $callback = function (AMQPMessage $message) {
            $this->processMessage($message);
        };

        $channel->basic_consume(config('queue.connections.activemq.queue'), '', false, true, false, false, $callback);

        while ($channel->is_consuming()) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();
    }

    protected function processMessage(AMQPMessage $message)
    {
        $this->info('Received message: ' . $message->getBody());
    }
}

Laravel: V10.10 Php V8.1

I tried to change host and port but nothing changed.


Solution

  • ActiveMQ (both classic and Artemis) speaks AMQP 1.0 the ISO standard AMQP spec while the php-amqplib appears to only support the 0.9.1 draft standard so you cannot connect to ActiveMQ using that client. You'd need to find a php client that supports AMQP 1.0 if you need to use that runtime.

    Both brokers also support the STOMP protocol and I'd guess there are available STOMP clients for PHP.