When writing data to kafka, it is possible to use a column named key that will be used to choose the partition:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
I need to manually decide the partition, independently from the key. Is it possible to specify partition manually ? Or give a custom partitionner so that I control what is the logic that choose the partition ?
You just have to add option kafka.partitioner.class
with your custom partitioner with appropriate logic.
val dataStreamWriter: DataStreamWriter[Row] = ???
dataStreamWriter.option("kafka.partitioner.class", "com.example.CustomKafkaPartitioner")