Search code examples
javaapache-kafkaspring-integration

How to set ID header in Spring Integration Kafka Message?


I have a demo Spring Integration project which is receiving Kafka messages, aggregating them, and then releasing them. I'm trying to add JdbcMessageStore to the project. The problem is that it failing with error:

Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
    at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]

After debugging I found that it requires the UUID header id in this message. But the problem is that I can't manually set the Kafka header id - it is forbidden (the same as timestamp header) - I tried to do this in Kafka producer in different project.

If I'm using IDEA plugin named Big Data Tools and send a message from there I'm able to set id header but it is received by my project as an array of bytes and it is failing with error

IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]

I can't find any solution on how to resolve this issue. I need to set somehow this id header to be able to store messages in the database. Thanks in advance


Solution

  • The KafkaMessageDrivenChannelAdapter has an option:

    /**
     * Set the message converter to use with a record-based consumer.
     * @param messageConverter the converter.
     */
    public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
    

    Where you can set a MessagingMessageConverter with:

    /**
     * Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
     * will try to use a default value. By default set to {@code false}.
     * @param generateMessageId true if a message id should be generated
     */
    public void setGenerateMessageId(boolean generateMessageId) {
        this.generateMessageId = generateMessageId;
    }
    
    /**
     * Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
     * used instead. By default set to {@code false}.
     * @param generateTimestamp true if a timestamp should be generated
     */
    public void setGenerateTimestamp(boolean generateTimestamp) {
        this.generateTimestamp = generateTimestamp;
    }
    

    set to true.

    This way the Message created from a ConsumerRecord will have respective id and timestamp headers.

    You also simply can have a "dummy" transformer to return incoming payload and the framework will create a new Message where those headers are generated.