Search code examples
apache-sparkpysparkazure-databricksdata-partitioningapache-hudi

PySpark: querying Hudi partitioned table


I'm following the Apache Hudi documentation to write and read a Hudi table. Here's the code I'm using to create and save a PySpark DataFrame into Azure DataLake Gen2:

tableName = "my_hudi_table"
basePath = <<table_path>>
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df_hudi = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.datasource.write.recordkey.field': 'uuid',
    'hoodie.datasource.write.partitionpath.field': 'partitionpath',
    'hoodie.datasource.write.table.name': tableName,
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.upsert.shuffle.parallelism': 2,
    'hoodie.insert.shuffle.parallelism': 2
}

df_hudi.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)

This works, and it is generating the expected folder structure based on the partition field partitionpath. This field takes values such as "americas/brazil/sao_paulo", and subfolders like this are created:

  • americas
    • brazil
      • sao_paulo
        • parquet_file
        • .hoodie_partition_metadata

However, when I try to read this table I get an empty DataFrame. Interestingly, the empty DataFrame has the correct schema:

partition_column = "partitionpath"
        
hudi_read_options = {
   'hoodie.datasource.read.partitionpath.field': partition_column,
   'hoodie.file.index.enable': 'false'
}
    
df_hudi = spark.read.format("hudi").options(**hudi_read_options).load(basePath)
df_hudi.printSchema()

root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- begin_lat: double (nullable = true)
 |-- begin_lon: double (nullable = true)
 |-- driver: string (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- fare: double (nullable = true)
 |-- partitionpath: string (nullable = true)
 |-- rider: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- uuid: string (nullable = true)

But if I read a specific partition then I get the data:

df = spark.read.format("parquet").load(basePath + "/americas/brazil/sao_paulo/")

I feel like I might be missing some additional parameter when reading, since I can query the data using Spark Structured Streaming and it works fine:

spark.readStream.format("hudi").load(basePath)

I'm running this code in an Azure Databricks cluster with following specs:

Databricks Runtime Version - 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)

Python version - Python 3.8.10

Having installed the following package from Maven repository: "hudi_spark3_1_2_bundle_2_12_0_10_1.jar"

I'm setting 'hoodie.file.index.enable': 'false' when reading as suggested in this Github issue because otherwise I get the error:

NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileStatusCache.putLeafFiles(Lorg/apache/hadoop/fs/Path;[Lorg/apache/hadoop/fs/FileStatus;)V

Am I missing something? Thanks in advance.


Solution

  • spark.read.format("hudi").load(basePath) might not work because of the databricks issue. Then if you refer to the hudi doc about fileindex :

    Since 0.9.0 hudi has support a hudi built-in FileIndex: HoodieFileIndex to query hudi table, which supports partition pruning and metatable for query. This will help improve query performance. It also supports non-global query path which means users can query the table by the base path without specifing the "*" in the query path.

    Then the glob syntax should help:

    hudi_read_options = {
       'hoodie.file.index.enable': 'false'
    }
    spark.read.format("hudi").options(**hudi_read_options).load(basePath + "/*/*/*)