In the Flume agent I am collection the elements from Kafka topics and I need to insert them in ES. However I need to perform a previous digestion process in the sink, so I need to write a custom sink to pass the data from the Agent's channel to a java digestion module (which I have written already).
Can anyone share with me a template of a custom sink and can use as a reference? Flumes official website doesn't say much about this topic: A custom sink’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom sink is its FQCN. https://flume.apache.org/FlumeUserGuide.html#custom-sink
And once the custom sink is ready, How could I link the following three files to make the agent work:
Thank you for any feedback. I will keep adding information as soon as I progress in this task.
Hope you are trying to use Flume to recieve events from Kafka (source) and forwarding it to ES (sink) with some data processing logic already you have.
With this understanding, I would suggest you to look into Flume interceptors which is responsible for altering/filtering the events on fly before sending to Sink.
So all your business logic to alter the events can be implemented as a custom interceptor and it should be configured to the Flume channel.
For reference you can checkout the native interceptors source code already available. This should probably give you an idea on the Flume interceptor framework.
Here is the ES Sink source code
Sample Flume config
a1.sources = kafkaSource
a1.sinks = ES_Sink
a1.channels = channel1
a1.sources.kafkaSource.interceptors = i1
a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder
a1.sinks.ES_Sink.channel = channel1
a1.sinks.ES_Sink.type = elasticsearch
a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200