In the documentation it is mentioned that producer retries to send the message to the queue based on max_retries
.
So I shutdown the Kafka and then tried my producer. I get this error
Fetching cluster metadata from kafka://localhost:9092
[topic_metadata] Opening connection to localhost:9092 with client id MYCLIENTID
ERROR -- : [topic_metadata] Failed to connect to localhost:9092: Connection refused
DEBUG -- : Closing socket to localhost:9092
ERROR -- : Failed to fetch metadata from kafka://localhost:9092
Completed 500 Internal Server Error in 486ms (ActiveRecord: 33.9ms)
which it make sense, however the retries
never happens after that. I have read the doc inside-out and I can't figure it out how this retries
actually going to trigger?
Here is my code:
def self.deliver_message(kafka, message, topic, transactional_id)
producer = kafka.producer(idempotent: true,
transactional_id: transactional_id,
required_acks: :all,
max_retries: 5,
retry_backoff: 5)
producer.produce(message, topic: topic)
producer.deliver_messages
end
link to doc:
https://www.rubydoc.info/gems/ruby-kafka/Kafka/Producer#initialize-instance_method
Thank you in advance.
The retries are based on the type of Exception thrown by the producer callback. According to the Callback Docs there are the following Exception possible happening during callback:
The exception thrown during processing of this record. Null if no error occurred. Possible thrown exceptions include:
Non-Retriable exceptions (fatal, the message will never be sent):
Retriable exceptions (transient, may be covered by increasing #.retries):
Shutting down Kafka completely rather looks like a non-retriable Exception.