Search code examples
spring-integrationspring-kafka

Why KafkaMessageSource generates messages without Id?


I am using an @InboundChannelAdapter to consume messages from Kafka. The messages produced by KafkaMessageSource do not have an Id, which makes it impossible to insert them in a channel backed by JdbcChannelMessageStore.

Can this behavior be configured?

I tried going through a messageTransformer to build a new message from the original one, but again no Id is generated.


Solution

  • Add a custom MessagingMessageConverter to the source:

    /**
     * Set the message converter to replace the default
     * {@link MessagingMessageConverter}.
     * @param messageConverter the converter.
     */
    public void setMessageConverter(RecordMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
    

    and call setGenerateMessageId(true) (and, optionally setGenerateTimestamp) on the converter.