Search code examples
ruby-on-railsrubyapache-kafkakafka-producer-apiruby-kafka

How to check to see if Ruby-Kafka retries works?


In the documentation it is mentioned that producer retries to send the message to the queue based on max_retries.

So I shutdown the Kafka and then tried my producer. I get this error

Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)

which it make sense, however the retries never happens after that. I have read the doc inside-out and I can't figure it out how this retries actually going to trigger?

Here is my code:

 def self.deliver_message(kafka, message, topic, transactional_id)
      producer = kafka.producer(idempotent: true,
                                transactional_id: transactional_id,
                                required_acks: :all,
                                max_retries: 5,
                                retry_backoff: 5)
      producer.produce(message, topic: topic)
      producer.deliver_messages
    end

link to doc:

https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method

Thank you in advance.


Solution

  • The retries are based on the type of Exception thrown by the producer callback. According to the Callback Docs there are the following Exception possible happening during callback:

    The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:

    Non-Retriable exceptions (fatal, the message will never be sent):

    • InvalidTopicException
    • OffsetMetadataTooLargeException
    • RecordBatchTooLargeException
    • RecordTooLargeException
    • UnknownServerException

    Retriable exceptions (transient, may be covered by increasing #.retries):

    • CorruptRecordException
    • InchvalidMetadataException
    • NotEnoughReplicasAfterAppendException
    • NotEnoughReplicasException
    • OffsetOutOfRangeException
    • TimeoutException
    • UnknownTopicOrPartitionException

    Shutting down Kafka completely rather looks like a non-retriable Exception.