Search code examples
springapache-kafkaspring-integrationspring-kafka

How to read offset after delivering message using kafka outbound channel


I am using kafka output channel adapter to send the messages.Is there any way to get the offset after delivering the message, below is my code

<int-kafka:outbound-channel-adapter id="kafkaCommonOutboundChannelAdapter"
                                        kafka-template="kafkaTemplate"
                                        header-mapper="kafkaHeaderMapper"
                                        auto-startup="true"
                                        topic-expression="headers['topic']"
                                        partition-id-expression="headers['partition']"
                                        sync="true">
        <int-kafka:request-handler-advice-chain>
            <ref bean="requestHandlerAdvice"/>
            <ref bean="retryAdvice"/>
        </int-kafka:request-handler-advice-chain>
    </int-kafka:outbound-channel-adapter>


  <bean id="requestHandlerAdvice"
      class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
    <property name="trapException" value="true"/>
    <property name="onSuccessExpression" ref="success"/>
    <property name="successChannelName" value="successChannel"/>
    <property name="onFailureExpression" ref="failure"/>
    <property name="failureChannelName" value="failureChannel"/>
</bean>

Thanks


Solution

  • The KafkaProducerMessageHandler comes with an option like:

    /**
     * Set the success channel.
     * @param sendSuccessChannel the Success channel.
     */
    public void setSendSuccessChannel(MessageChannel sendSuccessChannel) {
    

    which is used in the end after successful production:

    processSendResult(message, producerRecord, sendFuture, getSendSuccessChannel());
    

    and does this:

                    if (metadataChannel != null) {
                        KafkaProducerMessageHandler.this.messagingTemplate.send(metadataChannel,
                                getMessageBuilderFactory()
                                        .fromMessage(message)
                                        .setHeader(KafkaHeaders.RECORD_METADATA, sendResult.getRecordMetadata())
                                        .build());
                    }
    

    Pay attention to that KafkaHeaders.RECORD_METADATA where you can get access to the offset via RecordMetadata object.

    See respective XML attribute:

        <xsd:attribute name="send-success-channel" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation><![CDATA[
                    Specifies the channel to which message with a payload of type
                    'org.apache.kafka.clients.producer.RecordMetadata' will be sent
                    after a successful send.
                ]]></xsd:documentation>
                <xsd:appinfo>
                    <tool:annotation kind="ref">
                        <tool:expected-type type="org.springframework.messaging.MessageChannel" />
                    </tool:annotation>
                </xsd:appinfo>
            </xsd:annotation>
        </xsd:attribute>