Search code examples
springrabbitmqspring-integrationspring-amqpspring-integration-dsl

Spring integration (Manual Acking)


I want to create a simple IntegrationFlow with Spring integration, and I am having difficulties.

I want to create an integration flow that takes messages from a queue in Rabbit Mq and posts the messages to an endpoint Rest. I want to ack manually depending on the results of the post that I will make.

A typical behavior of the integration Flow would be like this:

  1. I receive a message in the queue.
  2. Spring detects it, takes the message and posts it in the Rest endpoint.
  3. The end point responds with a 200 code.
  4. Spring integration ack the message.

If the endpoint responds with an error code I want to be able to nack or retry.

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);

/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container)) 
    .handle(msg ->
    {
        String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
        HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
        restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
        System.out.println(msgString);
    })
    .get();

Solution

  • You don't need to use manual acknowledge mode for this use case; if he rest call returns normally, the container will ack the message; if an exception is thrown, the container will nack the message and it will be redelivered.

    If you use manual acks, the Channel and deliveryTag are available in the AmqpHeaders.CHANNEL and AmqpHeaders.DELIVERY_TAG message headers and you can call basicAck or basicReject on the channel (you will have to add an error channel to the inbound adapter to handle errors.