Search code examples
javaapache-kafkaapache-stormkafka-producer-api

Exposing Kafka publishing exceptions in KafkaBolt


I use KafkaBolt in Storm to publish messages to various Kafka topics. I want to put logging and metrics around the publishing logic so I can create alerts around any exceptions that might be thrown when there's a publishing failure. Exposing those exceptions is done through a Callback function that's passed into KafkaProducer.send(), which is executed after publishing succeeds or fails.

The problem is that KafkaBolt completely encapsulates its KafkaProducer so there's no way to inject a custom Callback, so if I want to see any errors I have to look in the Storm UI. I've worked around this by creating a wrapper for KafkaBolt. This wrapper will, in turn, wrap the OutputCollector passed into KafkaBolt.prepare() in a custom OutputCollector that overrides the behavior of OutputCollector.reportError(). I can then add my own logging and metrics reporting code there and then have it call the original method.

This solution seems perfectly adequate for what I need, but it seems odd that KafkaBolt makes it so difficult to programmatically access those exceptions. I was wondering if maybe I was missing something obvious and if there was a better way to do this.


Solution

  • I don't think you're missing anything, you're likely just the first person with this need. Someone has to hit this issue and decide to fix it :)

    If you'd like to make changes to the bolt to support custom error handling (e.g. by allowing the user to provide a callback as you suggest), you can raise an issue at https://issues.apache.org/jira/projects/STORM/issues and make a PR against https://github.com/apache/storm/pulls. You're of course also welcome to only raise the issue, someone else might see it and decide to fix it, but contributing the fix yourself would likely be faster.

    edit: You can find the bolt code at https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java