Search code examples
apache-sparkpysparkapache-kafkaapache-spark-sqlkafka-producer-api

Spark: How to use custom partitionner when writing data to Kafka


When writing data to kafka, it is possible to use a column named key that will be used to choose the partition:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()

I need to manually decide the partition, independently from the key. Is it possible to specify partition manually ? Or give a custom partitionner so that I control what is the logic that choose the partition ?


Solution

  • You just have to add option kafka.partitioner.class with your custom partitioner with appropriate logic.

    val dataStreamWriter: DataStreamWriter[Row] = ???
    dataStreamWriter.option("kafka.partitioner.class", "com.example.CustomKafkaPartitioner")