Search code examples
pythonapache-sparkpysparkapache-iceberg

spark write to iceberg table without repartition


I am developing a process which will write to different iceberg table with different partition. Before, we write data into iceberg by spark, we need re-partition and sort with partition firstly. Now, I can't. I found that set iceberg table property write.distribution-mode = 'hash' will help, but it did not work when i tried.

Anyone knows why the property does not work, or Is there any way to make spark write to iceberg re-partition automatically?


Solution

  • write.distribution-mode, - property that controls the distribution mode for parallel writes.

    https://iceberg.apache.org/docs/latest/configuration/

    df.repartition - repartitioning.

    https://sparkbyexamples.com/pyspark/pyspark-repartition-usage/

    So, I’m hoping that you can use this example.

    In Python

    from pyspark.sql import SparkSession
    
    def write_to_iceberg(df, table_path, partition_column):
        # Repartition based on partition_column
        df = df.repartition(partition_column)
    
        
        # Write DataFrame 
        df.write.format("iceberg") \
            .mode("append") \
            .option("write.distribution-mode", "hash") \
            .save(table_path)
    
    # Create session
    spark = SparkSession.builder.appName("IcebergExample").getOrCreate()
    
    # Example DataFrame
    df = spark.createDataFrame([("A", 1), ("B", 2), ("A", 3)], ["partition_column", "value"])
    
    # Define list of table paths and partition columns
    table_configs = [
        {"table_path": "path_to_table_1", "partition_column": "partition_column_1"},
        {"table_path": "path_to_table_2", "partition_column": "partition_column_2"},
        # Add more table configs as needed
    ]
    
    # Iterate through the table configurations. Write DataFrame to each table
    for config in table_configs:
        write_to_iceberg(df, config["table_path"], config["partition_column"])
    
    # Stop 
    spark.stop()