I am using an @InboundChannelAdapter to consume messages from Kafka. The messages produced by KafkaMessageSource do not have an Id, which makes it impossible to insert them in a channel backed by JdbcChannelMessageStore.
Can this behavior be configured?
I tried going through a messageTransformer to build a new message from the original one, but again no Id is generated.
Add a custom MessagingMessageConverter
to the source:
/**
* Set the message converter to replace the default
* {@link MessagingMessageConverter}.
* @param messageConverter the converter.
*/
public void setMessageConverter(RecordMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}
and call setGenerateMessageId(true)
(and, optionally setGenerateTimestamp
) on the converter.