Search code examples
apache-kafkaavroparquetapache-kafka-connectconfluent-platform

How to commit Kafka messages to HDFS sink on reaching a specific size (128 Mb)


My configuration: Confluent (5.0.0) Kafka produces some avro messages. Connect worker (HDFS connector sink) streams these messages to HDFS node in Parquet format. I configured connect worker to commit messages to HDFS every 5000 messages (flush.size=5000). This config works fine.

My question: is there any workaround to commit messages on reaching exactly 128 Mb (or 256 Mb), not count of messages ?

My HDFS Connector configuration file:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=some_topic
hdfs.url=hdfs://hdfshost:8020/user/someuser/kafka_hdfs_sink/
flush.size=5000

Solution

  • There is no such configuration - see this open issue

    The workaround would be to know, on average, how large each message is for the topics (in a single Kafka partition because that's how files get written), then set the flush.size accordingly to approximately reach a factor of the HDFS block size.

    If you use the TimeBasedPartioner, then you will have to know either the number of messages or the time at which your messages will reach your target size.