Search code examples
rustrabbitmqamqp

How to exchange AMQP 1.0 messages using fe2o3-amqp and RabbitMQ with Topic Exchange


I'm trying to use the fe2o3-amqp crate (Rust) for receiving and sending messages through a RabbitMQ broker configured with the AMQP 1.0 protocol.

How do I configure the Connection, Session and Sender/Receiver for publishing/subscribing to a topic exchange?

This is what I've done so far. For topic exchange in RabbitMQ, I've configured an Exchange (named MyExchange, Durable, Type: Topic), a Queue (named MyQueue, type: Classic, Durable), and a Binding from MyExchange to MyQueue with routing key my.topic.

Using a Python script with pika, on Python 3.9.2, I can send messages to my.topic that can be viewed enqueued in MyQueue. I'm not sure here pika is using AMQP version 1.0, but at least it confirms the exchange-routing-queue is working. The relevant part in python is as simple as follows, the rest is just glue (simply creating channel and connection):

channel.basic_publish(exchange='MyExchange', routing_key='my.topic', body='Hello topic!')

This is what most examples in documentation do, except examples don't use the Sender::builder():

    let mut connection = create_connection(
        "a-sender",
        "blah-blah",
    ).await?;

    let session = Session::begin(&mut connection).await?;

    let sender = Sender::builder()
        .name("rust-sender-link-1")
        .target("some-queue-name")
        .attach(&mut session)
        .await
        .unwrap();

If I use MyQueue as "my.topic" it fails with error code SenderAttachError. Any other queue name just creates a new queue.

So, how do I specify the exchange and route?

BR.


Solution

  • Thanks to @minghua-wu, the author of fe2o3-amqp crate, for providing useful comments very promptly in the project's page on github.

    There is a helpful question that was posted here a while ago, Unable to get Topic Exchange to Work in RabbitMQ AMQP 1.0.

    The target's name for attaching the sender/receiver to a session must include the route root /exchange/my.topic for indicating explicitly that the topic will be related to a specific exchange-topic. Otherwise attachment will fail, or will occur to the broker's default topic exchange and messages will be sent to another queue.

    Prior to this, in RabbitMQ broker, the exchange's filter must be configured for routing messages to the appropriated queue. Last but not least, also the user permissions must be configured so access to this topic is granted.

    // "test.topic"
    let sender = Sender::attach(&mut session, "amq_topic_sender", "/exchange/my_exchange/my.topic").await.unwrap();
    let receiver = Receiver::attach(&mut session, "amq_topic_receiver", "/exchange/my_exchange/my.topic").await.unwrap();