I use KafkaBolt
in Storm to publish messages to various Kafka topics. I want to put logging and metrics around the publishing logic so I can create alerts around any exceptions that might be thrown when there's a publishing failure. Exposing those exceptions is done through a Callback
function that's passed into KafkaProducer.send()
, which is executed after publishing succeeds or fails.
The problem is that KafkaBolt
completely encapsulates its KafkaProducer
so there's no way to inject a custom Callback
, so if I want to see any errors I have to look in the Storm UI. I've worked around this by creating a wrapper for KafkaBolt
. This wrapper will, in turn, wrap the OutputCollector
passed into KafkaBolt.prepare()
in a custom OutputCollector
that overrides the behavior of OutputCollector.reportError()
. I can then add my own logging and metrics reporting code there and then have it call the original method.
This solution seems perfectly adequate for what I need, but it seems odd that KafkaBolt
makes it so difficult to programmatically access those exceptions. I was wondering if maybe I was missing something obvious and if there was a better way to do this.
I don't think you're missing anything, you're likely just the first person with this need. Someone has to hit this issue and decide to fix it :)
If you'd like to make changes to the bolt to support custom error handling (e.g. by allowing the user to provide a callback as you suggest), you can raise an issue at https://issues.apache.org/jira/projects/STORM/issues and make a PR against https://github.com/apache/storm/pulls. You're of course also welcome to only raise the issue, someone else might see it and decide to fix it, but contributing the fix yourself would likely be faster.
edit: You can find the bolt code at https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java