Search code examples
apache-kafkaflumeflume-ng

Flume: Routing events to the proper topic partition with Kafka channel


In Flume, when using a Kafka channel, is there a way to influence what partition an event is sent to?

With Kafka sink, the key FlumeEvent header is apparently used to choose a partition but I could not find any documentation regarding partitions with the Kafka channel.


Solution

  • The Kafka channel for Flume does not support mapping an Event header to a partition key out-of-the-box like KafkaSink does.

    However, modifying it so that it does is not too complicated. As I am not sure I can share the code, I will just give directions:

    1. add a configuration key for the name of the header which will be mapped to partition key
    2. in inner class KafkaTransaction, replace byte[] in the type of member serializedEvents with something that can also hold a String key for each and every event (either an inner class, or even a Kafka KeyedMessage<String, byte[]>)
    3. in method KafkaTransaction.doPut(Event event), retrieve key from headers and store in serializedEvents together with serialized message
    4. in method KafkaTransaction.doCommit(), use the key stored with serialized events instead of batchUUID.

    NOTE that events in a transaction will no longer be guaranteed to be processed by a single KafkaChannel instance at the consumer end of the channel, so you'll have to check that it is compatible with your use case (regarding transaction size, etc).