Search code examples
spring-cloud-streamspring-cloud-stream-binder-kafka

How to get the Message headers in a consumer


Using the new function based programming model of Spring Cloud Stream I need to have access to some of the headers of the message I consume. I figured I would consume Message<MyDto> instead of only MyDto, but if I do this MyDto properties are all null.

This is in broad strokes what I want to do :

@ToString
@Getter
@Setter
public class MyDto {
   private OtherDto otherDto;
}

@Bean
public Consumer<Message<MyDto>> onRawImport() {
    message -> logger.info("Received {}", message.getPayload());  // <-- prints "Received MyDto(otherDto=OtherDto(...))"
}

whereas the following works perfectly in my configuration

@Bean
public Consumer<MyDto> onRawImport() {
    myDto -> logger.info("Received {}", myDto);  // <-- "Received MyDto(otherDto=null)"
}

Is there a simple way to consume Message directly ?


Addendum :

If I turn DEBUG on for org.springframework.cloud.function.context.catalog i see this with Consumer<MyDto> :

BeanFactoryAwareFunctionRegistry : Applying function: onRawImport
BeanFactoryAwareFunctionRegistry : Applying type conversion on input value GenericMessage [payload=byte[288], headers=...snip...]
BeanFactoryAwareFunctionRegistry : Function type: java.util.function.Consumer<MyDto>
BeanFactoryAwareFunctionRegistry : Raw type of value: GenericMessage [payload=byte[288], ...snip...}] is class MyDto
BeanFactoryAwareFunctionRegistry : Converted from Message: MyDto(otherDto=OtherDto(...))
BeanFactoryAwareFunctionRegistry : Converted input value MyDto(otherDto=OtherDto(...))
MyOwnListener : Received MyDto(id=5, message=test)

and this with Consumer<Message<MyDto>>

BeanFactoryAwareFunctionRegistry : Applying function: onRawImport
BeanFactoryAwareFunctionRegistry : Applying type conversion on input value GenericMessage [payload=byte[288], headers=...snip...]
BeanFactoryAwareFunctionRegistry : Function type: Function type: java.util.function.Consumer<org.springframework.messaging.Message<MyDto>>
BeanFactoryAwareFunctionRegistry : Raw type of value: GenericMessage [payload=byte[288], ...snip...}] is class MyDto
BeanFactoryAwareFunctionRegistry : Converted from Message: MyDto(otherDto 
=null)
BeanFactoryAwareFunctionRegistry : Converted input value MyDto(otherDto 
=null)
MyOwnListener : Received MyDto(otherDto 
=null)

Solution

  • Which version are you using? I just tested it with Boot 2.4.4, cloud 2020.0.2 and it works fine...

    @SpringBootApplication
    public class So66990612Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So66990612Application.class, args);
        }
    
        @Bean
        Consumer<Message<Foo>> input() {
            return System.out::println;
        }
    
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                template.convertAndSend("input-in-0", "x", "{\"bar\":\"baz\"}",
                        msg -> {
                            msg.getMessageProperties().setContentType("application/json");
                            return msg;
                        });
            };
        }
    
        public static class Foo {
    
            private String bar;
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
            @Override
            public String toString() {
                return "Foo [bar=" + this.bar + "]";
            }
    
        }
    
    }
    
    GenericMessage [payload=Foo [bar=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=input-in-0, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=input-in-0.anonymous.Uh_s89lKRnKeJ3ls991pXA, amqp_redelivered=false, amqp_receivedRoutingKey=x, amqp_contentEncoding=UTF-8, id=1a848cf6-3f85-c017-fa70-d52f43c0fc67, amqp_consumerTag=amq.ctag-w6ZyXBGOtC-q7rCY8Jy-gA, sourceData=(Body:'{"bar":"baz"}' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=input-in-0, receivedRoutingKey=x, deliveryTag=1, consumerTag=amq.ctag-w6ZyXBGOtC-q7rCY8Jy-gA, consumerQueue=input-in-0.anonymous.Uh_s89lKRnKeJ3ls991pXA]), contentType=application/json, timestamp=1617888626028}]