Search code examples
apache-sparkpysparkapache-spark-sqlparquet

Efficiently select key value parquet column in pyspark


I'm working with a rather sizable parquet table, and 99.9% of the data is contained in a single key:value column. For example:

# Cannot use hive metastore to access so we have to load this way
df = spark.read.parquet('hdfs://cluster/path/to/parquet') \
    .select('id, 'version', 'details')
df.printSchema()
>> root
 |-- id: string
 |-- version: string
 |-- details: map
 |    |-- key: string
 |    |-- value: struct
 |    |    |-- complex_struct_1: struct
 |    |    |    |-- complex_substruct_1: struct
 |    |    |    |    |-- ...
 |    |    |    |-- ...
 |    |    |-- complex_struct_2: struct
 |    |    |    |-- complex_substruct_n: struct
 |    |    |    |    |-- ...
 |    |    |    |-- ...
 |    |    |-- complex_field_n: struct

The column in question is details, possible keys are either key_1, key_2, both, or none. My question is how can I efficiently select only subfields belonging to key_1 (e.g. select details['key_1'].complex_struct_1.complex_substruct_1.field)?

Because the table is not in the hive metastore, I don't believe I can use spark.sql here, or if there would be any benefit. I understand how to naively load the DB, select the entire details column and then filter, but given that details column is absolutely massive (thousands of fields) and I only want a small subset, I would like to take advantage of the columnar access if possible here. Is this something I could do more efficiently, or is that beyond the capabilities of parquet?


Solution

  • You can use Spark SQL by creating a temp view:

    df.createOrReplaceTempView('df')
    df2 = spark.sql("""select details['key_1'].complex_struct_1.complex_substruct_1.field from df""")
    

    which should be efficient and supposedly only fetches the subset of results that you need. You can do df2.explain() to see how the query is actually executed.