Search code examples
apache-sparkpysparkavroazure-databricks

Spark reading Partitioned avro significantly slower than pointing to exact location


I am trying to read partitioned Avro data which is partitioned based on Year, Month and Day and that seems to be significantly slower than pointing it directly to the path. In the Physical plan I can see that the partition filters are getting passed on, so it is not scanning the entire set of directories but still it is significantly slower.

E.g. reading the partitioned data like this

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/"
 
profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)
 
profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt \
                             where Year= " + year + " and Month=" + month_nz + " and Day=" + date_nz )

Takes around 3 mins

whereas this where I point to the exact location using a string builder, gets done in 2 secs

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/Year=" +year +"/Month=" + month_nz + "/Day=" + date_nz
 
profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)

 
profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt "
                              )
                  
display(df)

Looking at the physical plan for the 1st one (slower) does show that the partition filter is passed on

What could explain the discovery phase taking this long?

Any questions and I can elaborate.


Solution

  • Ok,the reason for the slowness was because of building of InMemoryFileIndex.

    Though partitioning pruning takes place, Spark needs to know about the partition and file info and that's where it needs that step. This S.O post elaborates it : here

    So, the idea was then to create an external table, so that this information is built, which I did using a script like this (I used an inline schema, you can use a schema file if you have one)

    create external table ProfitLossAvro 
    
    
    partitioned by (Year int, Month int, Day int)
    
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
    
    
    Stored As 
    
     inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
    
     outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
    
    Location 'abfss://[email protected]/datawarehouse/CommercialDM.ProfitLoss/'
    
    TBLPROPERTIES (
        'avro.schema.literal'='{
          "name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
          "type": "record",
          "fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
        }');
    

    But if you then query this table, you will get 0 rows. This is because the existing partitions are not added automatically. So, you can do that using

    msck repair table ProfitLossAvro
    

    and everytime you add data to your datalake, you can do an add partition. Something like this:-

    ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)
    

    If you then query your data using a command like the below, it will work much faster

    df=sqlContext.sql("select * \
                   from ProfitLossAvro \
                   where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)
    
    display(df)