Search code examples
sqlapache-sparkparallel-processingapache-spark-sqlhadoop-yarn

How to optimize spark sql to run it in parallel


I am a spark newbie and have a simple spark application using Spark SQL/hiveContext to:

  1. select data from hive table (1 billion rows)
  2. do some filtering, aggregation including row_number over window function to select first row, group by, count() and max(), etc.
  3. write the result into HBase (hundreds million rows)

I submit the job to run it on yarn cluster (100 executors), it's slow and when I looked at the DAG Visualization in Spark UI, it seems only the hive table scan tasks were running in parallel, rest of steps #2, and #3 above are only running in one instance which probably should be able to optimize to be parallelized?

The application looks like:

Step 1:

val input = hiveContext
  .sql(
     SELECT   
           user_id  
           , address  
           , age  
           , phone_number  
           , first_name  
           , last_name  
           , server_ts   
       FROM  
       (     
           SELECT  
               user_id  
               , address  
               , age  
               , phone_number  
               , first_name  
               , last_name  
               , server_ts   
               , row_number() over 
                (partition by user_id, address,  phone_number, first_name, last_name  order by user_id, address, phone_number, first_name, last_name,  server_ts desc, age) AS rn  
           FROM  
           (  
               SELECT  
                   user_id  
                   , address  
                   , age  
                   , phone_number  
                   , first_name  
                   , last_name  
                   , server_ts  
               FROM  
                   table   
               WHERE  
                   phone_number <> '911' AND   
                   server_date >= '2015-12-01' and server_date < '2016-01-01' AND  
                   user_id IS NOT NULL AND  
                   first_name IS NOT NULL AND  
                   last_name IS NOT NULL AND  
                   address IS NOT NULL AND  
                   phone_number IS NOT NULL AND  
           ) all_rows  
       ) all_rows_with_row_number  
       WHERE rn = 1)

val input_tbl = input.registerTempTable(input_tbl)

Step 2:

val result = hiveContext.sql(
  SELECT state, 
         phone_number, 
         address, 
         COUNT(*) as hash_count, 
         MAX(server_ts) as latest_ts 
     FROM  
    ( SELECT  
         udf_getState(address) as state  
         , user_id  
         , address  
         , age  
         , phone_number  
         , first_name  
         , last_name  
         , server_ts  
     FROM  
         input_tbl ) input  
     WHERE state IS NOT NULL AND state != ''  
     GROUP BY state, phone_number, address)

Step 3:

result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)

The DAG Visualization looks like: enter image description here

As you can see, the "Filter", "Project" and "Exchange" in stage 0 are only running in one instance, so does the stage1 and stage2, so a few questions and apologies if the question is dumb:

  1. Does "Filter", "Project" and "Exchange" happen in Driver after data shuffling from each executor?
  2. What code maps to "Filter", "Project" and "Exchange"?
  3. how I could run "Filter", "Project" and "Exchange" in parallel to optimize the performance?
  4. is it possible to run stage1 and stage2 in parallel?

Solution

  • You're not reading the DAG graph correctly - the fact that each step is visualized using a single box does not mean that it isn't using multiple tasks (and therefore cores) to calculate that step.

    You can see how many tasks are used for each step by drilling-down into the stage view, that displays all tasks for this stage.

    For example, here's a sample DAG visualization similar to yours:

    enter image description here

    You can see each stage is depicted by a "single" column of steps.

    But if we look at the table below, we can see the number of tasks per stage:

    enter image description here

    One of them is using only 2 tasks, but the other uses 220, which means data is split into 220 partitions and partitions are processed in parallel, given enough available resources.

    If you drill-down into that stage, you can see again that it used 220 tasks and details for all the tasks.

    enter image description here

    Only tasks reading data from disk are shown in graph as having these "multiple dots" to help you understand how many files were read.

    SO - as Rashid's answer suggestes, check the number of tasks for each stage.