I am developing a process which will write to different iceberg table with different partition. Before, we write data into iceberg by spark, we need re-partition and sort with partition firstly. Now, I can't. I found that set iceberg table property write.distribution-mode = 'hash' will help, but it did not work when i tried.
Anyone knows why the property does not work, or Is there any way to make spark write to iceberg re-partition automatically?
write.distribution-mode, - property that controls the distribution mode for parallel writes.
https://iceberg.apache.org/docs/latest/configuration/
df.repartition - repartitioning.
https://sparkbyexamples.com/pyspark/pyspark-repartition-usage/
So, I’m hoping that you can use this example.
In Python
from pyspark.sql import SparkSession
def write_to_iceberg(df, table_path, partition_column):
# Repartition based on partition_column
df = df.repartition(partition_column)
# Write DataFrame
df.write.format("iceberg") \
.mode("append") \
.option("write.distribution-mode", "hash") \
.save(table_path)
# Create session
spark = SparkSession.builder.appName("IcebergExample").getOrCreate()
# Example DataFrame
df = spark.createDataFrame([("A", 1), ("B", 2), ("A", 3)], ["partition_column", "value"])
# Define list of table paths and partition columns
table_configs = [
{"table_path": "path_to_table_1", "partition_column": "partition_column_1"},
{"table_path": "path_to_table_2", "partition_column": "partition_column_2"},
# Add more table configs as needed
]
# Iterate through the table configurations. Write DataFrame to each table
for config in table_configs:
write_to_iceberg(df, config["table_path"], config["partition_column"])
# Stop
spark.stop()