I want to create a simple IntegrationFlow with Spring integration, and I am having difficulties.
I want to create an integration flow that takes messages from a queue in Rabbit Mq and posts the messages to an endpoint Rest. I want to ack
manually depending on the results of the post that I will make.
A typical behavior of the integration Flow would be like this:
If the endpoint responds with an error code I want to be able to nack or retry.
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
/* Get Message from RabbitMQ */
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
You don't need to use manual acknowledge mode for this use case; if he rest call returns normally, the container will ack the message; if an exception is thrown, the container will nack the message and it will be redelivered.
If you use manual acks, the Channel
and deliveryTag
are available in the AmqpHeaders.CHANNEL
and AmqpHeaders.DELIVERY_TAG
message headers and you can call basicAck
or basicReject
on the channel (you will have to add an error channel to the inbound adapter to handle errors.