Search code examples
kotlingorabbitmqapache-camelenterprise-integration

Can't do a simple Request-reply with RabbitMQ (RPC) and Apache Camel - not working


I hope you are well! First, I am new to the EIP world. I am trying to do a simple request reply with:

  • A Golang rabbitMQ client
  • An apache Camel route in Kotlin acting as a RabbitMQ server

I have tried to read all the docs I could and search for answers but I could't find nothing. I am basically desperate. Mainly I saw this and nothing has worked yet.

My goal is to do a sync request-reply as the image.

Request-reply pattern with RabbitMQ

My Golang client looks like this:

func (r *RabbitMQConn) GetQueue(name string) *amqp.Queue {
    ch := r.GetChannel()
    defer ch.Close()
    q, err := ch.QueueDeclare(
        name,
        false,
        false,
        true,
        false,
        nil,
    )
    if err != nil {
        panic(err)
    }
    return &q
}
func (r *RabbitMQConn) PublishAndWait(routingKey string, correlationId string, event domain.SyncEventExtSend) (domain.SyncEventExtReceive, error) {
    message, err := json.Marshal(event)
    if err != nil {
        return domain.SyncEventExtReceive{}, apperrors.ErrInternal
    }
    ch := r.GetChannel()
    defer ch.Close()
    q := r.GetQueue("response")
    h, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    if err != nil {
        return domain.SyncEventExtReceive{}, err
    }
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    err = ch.PublishWithContext(
        ctx,
        "",
        routingKey,
        false,
        false,
        amqp.Publishing{
            ContentType:   "application/json",
            Body:          message,
            CorrelationId: correlationId,
            ReplyTo:       q.Name,
        },
    )
    if err != nil {
        return domain.SyncEventExtReceive{}, err
    }

    for d := range h {
        fmt.Println("Received a message:", string(d.Body))
        if d.CorrelationId == correlationId {
            var event domain.SyncEventExtReceive
            err = json.Unmarshal(d.Body, &event)
            return event, err
        }
    }
    return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}

Basically, just consuming from the default exchange with a named response queue. Also, I send the queue name as the ReplyTo parameter and I give it a correlation id. The routing-key that is sent is daily-weather in this case.


On the server side, I tried to do the server with the default exchange, but Apache Camel forbids me to do nothing with that exchange.

from("rabbitmq:?queue=daily-weather&autoAck=true&autoDelete=false")

So, I assigned it the amq.direct exchange. However, that didn't also worked.

"rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false"

Then, I added a second RabbitMQ endpoint to see if it would sent it, but nothing.


    from("rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false")
        .log(LoggingLevel.INFO, "weather-daily", "Received message: \${body}")
        .to("rabbitmq:amq.direct?queue=response&autoAck=true&autoDelete=false")

I ask if anybody has any simple example to do this with Apache Camel, because I am ultra lost. Any further detail can be shared if you contact me.

Thank you very much!!!! :)


Solution

  • SOLVED

    Hi! After some time I decided to take a look to the spring-rabbitmq Camel component. I realised that Camel has exchange patterns, and rabbitmq, by default, sets it to inOut. This way, automatically returns the information back to the replyTo property.

      val RABBIMQ_ROUTE =
          "spring-rabbitmq:default?queues={{rabbitmq.weather.daily.routing_key}}"
    

    default refers to the default exchange queue.