Search code examples
flumeagent

How to write a custom ES sink in Flume 1.7


In the Flume agent I am collection the elements from Kafka topics and I need to insert them in ES. However I need to perform a previous digestion process in the sink, so I need to write a custom sink to pass the data from the Agent's channel to a java digestion module (which I have written already).

Can anyone share with me a template of a custom sink and can use as a reference? Flumes official website doesn't say much about this topic: A custom sink’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom sink is its FQCN. https://flume.apache.org/FlumeUserGuide.html#custom-sink

And once the custom sink is ready, How could I link the following three files to make the agent work:

  • custom sink
  • ingestion jar (java module to perform the ingestion process)
  • FlumeAgent.properties

Thank you for any feedback. I will keep adding information as soon as I progress in this task.


Solution

  • Hope you are trying to use Flume to recieve events from Kafka (source) and forwarding it to ES (sink) with some data processing logic already you have.

    With this understanding, I would suggest you to look into Flume interceptors which is responsible for altering/filtering the events on fly before sending to Sink.

    So all your business logic to alter the events can be implemented as a custom interceptor and it should be configured to the Flume channel.

    For reference you can checkout the native interceptors source code already available. This should probably give you an idea on the Flume interceptor framework.

    Here is the ES Sink source code

    Sample Flume config

    a1.sources = kafkaSource
    a1.sinks = ES_Sink
    a1.channels = channel1
    
    a1.sources.kafkaSource.interceptors = i1
    a1.sources.kafkaSource.interceptors.i1.type = org.apache.flume.interceptor.<Custom_Interceptor_name>$Builder
    
    a1.sinks.ES_Sink.channel = channel1
    a1.sinks.ES_Sink.type = elasticsearch
    a1.sinks.ES_Sink.hostNames = 127.0.0.1:9200