Search code examples
apache-sparkhivehadoop-partitioning

Spark partitions - using DISTRIBUTE BY option


We have a Spark environment which should process 50MM rows. These rows contains a key column. The unique number of keys are close to 2000. I would like to process all of those 2000 keys in parallel. Thus we are using a Spark sql like the following

hiveContext.sql("select * from BigTbl DISTRIBUTE by KEY")

Subsequently we have a mapPartitions that works nicely on all the partitions in parallel. However the trouble is, it creates only 200 partitions by default. Using a command like the following i am able to increase the partitions

hiveContext.sql("set spark.sql.shuffle.partitions=500");

However during real production run i would not know what is the number of unique keys. I want this to be auto managed. Is there a way to do this please.

Thanks

Bala


Solution

  • I would suggest that you use "repartition" function and then register the repartitioned as a new temp table and also further cache it for faster processing.

    val distinctValues = hiveContext.sql("select KEY from BigTbl").distinct().count() // find count distinct values 
    
    hiveContext.sql("select * from BigTbl DISTRIBUTE by KEY")
           .repartition(distinctValues.toInt) // repartition to number of distinct values
           .registerTempTable("NewBigTbl") // register the repartitioned table as another temp table
    
    hiveContext.cacheTable("NewBigTbl") // cache the repartitioned table for improving query performance
    

    for further queries use "NewBigTbl"