I have a demo Spring Integration project which is receiving Kafka messages, aggregating them, and then releasing them. I'm trying to add JdbcMessageStore
to the project. The problem is that it failing with error:
Caused by: java.lang.IllegalArgumentException: Cannot store messages without an ID header
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.2.15.RELEASE.jar:5.2.15.RELEASE]
at org.springframework.integration.jdbc.store.JdbcMessageStore.addMessage(JdbcMessageStore.java:314) ~[spring-integration-jdbc-5.3.8.RELEASE.jar:5.3.8.RELEASE]
After debugging I found that it requires the UUID header id
in this message. But the problem is that I can't manually set the Kafka header id - it is forbidden (the same as timestamp
header) - I tried to do this in Kafka producer in different project.
If I'm using IDEA plugin named Big Data Tools
and send a message from there I'm able to set id
header but it is received by my project as an array of bytes and it is failing with error
IllegalArgumentException Incorrect type specified for header 'id'. Expected [UUID] but actual type is [B]
I can't find any solution on how to resolve this issue. I need to set somehow this id
header to be able to store messages in the database.
Thanks in advance
The KafkaMessageDrivenChannelAdapter
has an option:
/**
* Set the message converter to use with a record-based consumer.
* @param messageConverter the converter.
*/
public void setRecordMessageConverter(RecordMessageConverter messageConverter) {
Where you can set a MessagingMessageConverter
with:
/**
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
* will try to use a default value. By default set to {@code false}.
* @param generateMessageId true if a message id should be generated
*/
public void setGenerateMessageId(boolean generateMessageId) {
this.generateMessageId = generateMessageId;
}
/**
* Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
* used instead. By default set to {@code false}.
* @param generateTimestamp true if a timestamp should be generated
*/
public void setGenerateTimestamp(boolean generateTimestamp) {
this.generateTimestamp = generateTimestamp;
}
set to true
.
This way the Message
created from a ConsumerRecord
will have respective id
and timestamp
headers.
You also simply can have a "dummy" transformer to return incoming payload and the framework will create a new Message
where those headers are generated.