Search code examples
pythonapache-sparkpysparkorc

Pyspark: Read in ORC data only for specific dates


I have a 3 data files stored in ORC, partitioned by the DATE.

/orc/orc_FLORIDA_2019-04-29/alloc_FLORIDA_2019-04-29/DATE=2019-04-29/myfile.snappy.orc

/orc/orc_FLORIDA_2019-04-29/avails_FLORIDA_2019-04-29/DATE=2019-04-29/myfile.snappy.orc

/orc/orc_FLORIDA_2019-04-29/orders_FLORIDA_2019-04-29/DATE=2019-04-29/myfile.snappy.orc

I pulled a week of data, so the next file structure looks like this for each set of files:

/orc/orc_FLORIDA_2019-04-30/alloc_FLORIDA_2019-04-30/DATE=2019-04-30/myfile.snappy.orc

/orc/orc_FLORIDA_2019-05-1/alloc_FLORIDA_2019-05-1/DATE=2019-05-1/myfile.snappy.orc

/orc/orc_FLORIDA_2019-05-2/alloc_FLORIDA_2019-05-2/DATE=2019-05-2/myfile.snappy.orc

ETC...

I know I can read in a single day's ORC file with the following commands:

alloc_orc = spark.read.orc("/orc/orc_FLORIDA_2019-04-30/alloc_FLORIDA_2019-04-30/")
avails_orc = spark.read.orc("/orc/orc_FLORIDA_2019-04-30/avails_FLORIDA_2019-04-30/")
orders_orc = spark.read.orc("/orc/orc_FLORIDA_2019-04-30/orders_FLORIDA_2019-04-30/")

How would I go about reading in the entire week of data for each table?

And, say if I had a month of data, could I only read in the first week? Or would I have to read in the entire month and then filter for the dates I want?

Do I need to tweak my file structure and save the outputted data to this for easier read-in?

/orc/orc_FLORIDA/alloc/DATE=2019-04-29/myfile.snappy.orc
/orc/orc_FLORIDA/alloc/DATE=2019-04-30/myfile.snappy.orc

/orc/orc_FLORIDA/avails/DATE=2019-04-29/myfile.snappy.orc
/orc/orc_FLORIDA/avails/DATE=2019-04-30/myfile.snappy.orc

ETC...

Any help is much appreciated!


Solution

  • If you change your structure to Date then alloc/avails it will be easier to predicate pushdown filters.

    /orc/orc_FLORIDA/DATE=2019-04-29/alloc/myfile.snappy.orc
    /orc/orc_FLORIDA/DATE=2019-04-30/alloc/myfile.snappy.orc
    /orc/orc_FLORIDA/DATE=2019-04-29/avails/myfile.snappy.orc
    /orc/orc_FLORIDA/DATE=2019-04-30/avails/myfile.snappy.orc
    
    #set predicate pushdown parameter
    spark.sql("set spark.sql.orc.filterPushdown=true").show()
    
    #read 1 week files and you can extract alloc,avails from file_name in case if you need to add them as column
    spark.read.orc("/orc/orc_FLORIDA").\
    filter((col("DATE") >= "strt_date") & (col("batchdate") < "end_date")).\
    withColumn("file_name",input_file_name()).\
    show(10,False)