Search code examples
apache-sparkhiveapache-hudi

Writing spark DataFrame In Apache Hudi Table


I am new to apace hudi and trying to write my dataframe in my Hudi table using spark shell. For type first time i am not creating any table and writing in overwrite mode so I am expecting it will create hudi table.I am Writing below code.

    spark-shell \
    --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

    //Initialize a Spark Session for Hudi

    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    import org.apache.spark.sql.SparkSession

    val spark1 = SparkSession.builder().appName("hudi-datalake").master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.hive.convertMetastoreParquet", "false").getOrCreat ()

    //Write to a Hudi Dataset 

    val inputDF = Seq(
    ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
    ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
    ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
    ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
    ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
    ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
    ).toDF("id", "creation_date", "last_update_time")

    val hudiOptions = Map[String,String](
    HoodieWriteConfig.TABLE_NAME -> "work.hudi_test",
    DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "work.hudi_test",
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
    //  Upsert Data
    // Create a new DataFrame from the first row of inputDF with a different creation_date value

    val updateDF = inputDF.limit(1).withColumn("creation_date", lit("2014-01-01"))

    updateDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.overwrite).saveAsTable("work.hudi_test")

    while writing this write statement i m getting below error message.

    java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

can somone Kindly Guide me how should I write this statement.


Solution

  • Here is a working sample for your question in pyspark:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    
    spark = (
        SparkSession.builder.appName("Hudi_Data_Processing_Framework")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.hive.convertMetastoreParquet", "false")
        .config(
            "spark.jars.packages",
            "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2"
        )
        .getOrCreate()
    )
    
    input_df = spark.createDataFrame(
        [
            ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
            ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
            ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
            ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
            ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
            ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
        ],
        ("id", "creation_date", "last_update_time"),
    )
    
    hudi_options = {
        # ---------------DATA SOURCE WRITE CONFIGS---------------#
        "hoodie.table.name": "hudi_test",
        "hoodie.datasource.write.recordkey.field": "id",
        "hoodie.datasource.write.precombine.field": "last_update_time",
        "hoodie.datasource.write.partitionpath.field": "creation_date",
        "hoodie.datasource.write.hive_style_partitioning": "true",
        "hoodie.upsert.shuffle.parallelism": 1,
        "hoodie.insert.shuffle.parallelism": 1,
        "hoodie.consistency.check.enabled": True,
        "hoodie.index.type": "BLOOM",
        "hoodie.index.bloom.num_entries": 60000,
        "hoodie.index.bloom.fpp": 0.000000001,
        "hoodie.cleaner.commits.retained": 2,
    }
    
    # INSERT
    (
        input_df.write.format("org.apache.hudi")
        .options(**hudi_options)
        .mode("append")
        .save("/tmp/hudi_test")
    )
    
    # UPDATE
    update_df = input_df.limit(1).withColumn("creation_date", lit("2014-01-01"))
    (
        update_df.write.format("org.apache.hudi")
        .options(**hudi_options)
        .mode("append")
        .save("/tmp/hudi_test")
    )
    
    # REAL UPDATE
    update_df = input_df.limit(1).withColumn("last_update_time", lit("2016-01-01T13:51:39.340396Z"))
    (
        update_df.write.format("org.apache.hudi")
        .options(**hudi_options)
        .mode("append")
        .save("/tmp/hudi_test")
    )
    
    output_df = spark.read.format("org.apache.hudi").load(
        "/tmp/hudi_test/*/*"
    )
    
    output_df.show()
    

    Output: enter image description here

    Hudi table in Filesystem looks as follows: enter image description here

    Note: Your update operation actually creates a new partition and it does an insert, since you are modifying the partition column (2015-01-01 -> 2014-01-01). You can see that in the output.

    And I have provided an update example where it updates the last_update_time to 2016-01-01T13:51:39.340396Z which actually updates the id 100 in partition 2015-01-01 from 2015-01-01T13:51:39.340396Z to 2016-01-01T13:51:39.340396Z

    More samples can be found in Hudi Quick Start Guide