I am using kafka output channel adapter to send the messages.Is there any way to get the offset after delivering the message, below is my code
<int-kafka:outbound-channel-adapter id="kafkaCommonOutboundChannelAdapter"
kafka-template="kafkaTemplate"
header-mapper="kafkaHeaderMapper"
auto-startup="true"
topic-expression="headers['topic']"
partition-id-expression="headers['partition']"
sync="true">
<int-kafka:request-handler-advice-chain>
<ref bean="requestHandlerAdvice"/>
<ref bean="retryAdvice"/>
</int-kafka:request-handler-advice-chain>
</int-kafka:outbound-channel-adapter>
<bean id="requestHandlerAdvice"
class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="trapException" value="true"/>
<property name="onSuccessExpression" ref="success"/>
<property name="successChannelName" value="successChannel"/>
<property name="onFailureExpression" ref="failure"/>
<property name="failureChannelName" value="failureChannel"/>
</bean>
Thanks
The KafkaProducerMessageHandler
comes with an option like:
/**
* Set the success channel.
* @param sendSuccessChannel the Success channel.
*/
public void setSendSuccessChannel(MessageChannel sendSuccessChannel) {
which is used in the end after successful production:
processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel());
and does this:
if (metadataChannel != null) {
KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel,
getMessageBuilderFactory()
.fromMessage(message)
.setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata())
.build());
}
Pay attention to that KafkaHeaders.RECORD_METADATA
where you can get access to the offset via RecordMetadata
object.
See respective XML attribute:
<xsd:attribute name="send-success-channel" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Specifies the channel to which message with a payload of type
'org.apache.kafka.clients.producer.RecordMetadata' will be sent
after a successful send.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.messaging.MessageChannel" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>