I have set up a flume agent which uses the SpoolDir command to read CSV files from a directory.
I am using the channel type KafkaChannel in order to push these messages onto a Kafka topic with 10 partitions, which can later be processed by a Spark application.
The issue I am having is that each file is written to a single partition of the Kafka topic. Some of the files are considerably larger than others, meaning that the messages are spread very unevenly across the topic's partitions. This makes it incredibly difficult to allocate the correct amount of resource to my spark executors, as some end up doing all the heavy lifting while others sit there waiting for some logs to be added to their partition.
Is there any way to configure the KafkaChannel in flume to balance the messages across the topic's partitions, or to limit the number of messages sent at a time, therefore spreading the load across all available partitions?
I have played around with the following configuration options in flume without success:
a1.channels.kafkaChannel.capacity = 100
a1.channels.kafkaChannel.transactionCapacity = 100
a1.channels.kafkaChannel.batch.size = 100
The KafkaChannel source code has been ever so slightly modified to meet my needs but the default configuration options specified here are all still available: http://flume.apache.org/FlumeUserGuide.html#kafka-channel
Full configuration file (I have removed hostnames and other key information)
a1.sources = src
a1.channels = kafkaChannel
a1.sources.src.type = spooldir
a1.sources.src.channels = kafkaChannel
a1.sources.src.spoolDir = /data/silk/flume/V5
a1.sources.src.fileHeader = true
a1.sources.src.trackerDir = .flumespool
a1.sources.src.consumeOrder = oldest
a1.sources.src.deletePolicy = immediate
a1.sources.src.decodeErrorPolicy = REPLACE
a1.sources.src.pollDelay = 12000
a1.channels.kafkaChannel.type = com.example.flume.channel.kafka.KafkaChannel
a1.channels.kafkaChannel.brokerList = host1:9092,host2:9092,host3:9092
a1.channels.kafkaChannel.topic = TEST-TOPIC
a1.channels.kafkaChannel.capacity = 100
a1.channels.c1.transactionCapacity = 100
a1.channels.kafkaChannel.zookeeperConnect = host1:2181,host2:2181,host3:2181
a1.channels.kafkaChannel.parseAsFlumeEvent = false
Any help is appreciated, thanks in advance!
For anyone else facing this issue, I have found a workaround:
By implementing a MemoryChannel and a KafkaSink as opposed to pushing logs straight onto a KafkaChannel, I can see that the messages are being balanced much more evenly across my Kafka topic's partitions.