Search code examples
spring-integrationspring-cloudspring-cloud-streamspring-amqpspring-rabbit

Spring Cloud Stream - Functions - How to manually acknowledge rabbitmq message?


I'm using a spring cloud stream with rabbitbinder.

Using a @StreamListener, I could manually acknowledge rabbitmq messages by having Channel and deliveryTag injected into the method as follows:

 @StreamListener(target = MySink.INPUT1)
 public void listenForInput1(Message<String> message,
      @Header(AmqpHeaders.CHANNEL) Channel channel,
      @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {

    log.info(" received new message [" + message.toString() + "] ");
    channel.basicAck(deliveryTag, false);
 }

I am now trying to achieve the same using functions:

 @Bean
 public Consumer<Message<String>> sink1() {
    return message -> {
      System.out.println("******************");
      System.out.println("At Sink1");
      System.out.println("******************");
      System.out.println("Received message " + message.getPayload());
    };
  }

How do I get the Channel object in here so that I can acknowledge it with the deliveryTag? I am able to get the delivery tag form headers. However, I am unable to get the channel Object.


Solution

  • I was able to figure it out:

      @Bean
      public Consumer<Message<String>> sink1() {
        return message -> {
          System.out.println("******************");
          System.out.println("At Sink1");
          System.out.println("******************");
          System.out.println("Received message " + message.getPayload());
    
          Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
          Long deliveryTag = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
    
          try {
            channel.basicAck(deliveryTag, false);
          } catch (IOException e) {
            e.printStackTrace();
          }
        };
      }