In Flume, when using a Kafka channel, is there a way to influence what partition an event is sent to?
With Kafka sink, the key
FlumeEvent header is apparently used to choose a partition but I could not find any documentation regarding partitions with the Kafka channel.
The Kafka channel for Flume does not support mapping an Event header to a partition key out-of-the-box like KafkaSink does.
However, modifying it so that it does is not too complicated. As I am not sure I can share the code, I will just give directions:
byte[]
in the type of member serializedEvents
with something that can also hold a String
key for each and every event (either an inner class, or even a Kafka KeyedMessage<String, byte[]>
)KafkaTransaction.doPut(Event event)
, retrieve key from headers and store in serializedEvents
together with serialized messageKafkaTransaction.doCommit()
, use the key stored with serialized events instead of batchUUID
.NOTE that events in a transaction will no longer be guaranteed to be processed by a single KafkaChannel instance at the consumer end of the channel, so you'll have to check that it is compatible with your use case (regarding transaction size, etc).