Search code examples
apache-sparkpysparkdatabricks

Run query in parallel in Spark Databricks


I’m developing a query that has about 1000 lines, I’m using it for data quality. Basically, this query gets about 10 metrics on each column (SUM, AVG, STDDEV, PERCENTILE, MIN, MAX, COUNT,…). The problem is, some DataFrames has more then 100 columns. I tried two approaches to solve this problem.

First one is running a lot of small queries and union them all in a single DataFrame using spark SQL, like:

SELECT AVG (col1) as metric from table
UNION ALL 
SELECT MAX (col1) as metric from table
UNION ALL
SELECT MIN (col1) as metric from table
UNION ALL 
SELECT STDDEV (col1) as metric from table
UNION ALL
SELECT AVG (col2) as metric from table
UNION ALL 
SELECT MAX (col2) as metric from table
UNION ALL
SELECT MIN (col2) as metric from table
UNION ALL 
SELECT STDDEV (col2) as metric from table
…
SELECT STDDEV (col100) as metric from table

I cached my table DataFrame. Although, this query runs eternally on my cluster.

My second approach was:

SELECT AVG (col1) as avgcol1,
       MAX (col1) as maxcol1,
       MIN (col1) as mincol1,
       STDDEV (col1) as stddevcol1,
       AVG (col2) as avgcol2,
       MAX (col2) as maxcol2,
       MIN (col2) as mincol2,
       STDDEV (col2) as stddevcol2,
       …
FROM table

This approach also runs eternally on my cluster.

My cluster has 1 driver and 1 worker (4 cores and 8GB each).

My doubt is: Can I solve my performance problem without scaling my cluster? If so, what is the best approach to solve this problem?

I thought to use ThreadPool on Databricks to solve this performance issue. But, since spark is distributed is ThreadPool the best approach?

Or should I use spark paralelism (partition column, upperBound, lowerBound and numPartitions) to solve that?

NOTE: In some metrics I use subquerys with window functions (on string type columns)


Solution

  • If you want to do some data profiling against the data , Instead of using spark sql query simple way you can use following commands .

    describe: count, mean, stddev, min, max
    summary: describe + interquartile range (IQR)
    

    enter image description here

    display(fixed_price_df.describe())
    display(fixed_price_df.summary())