Search code examples
hadoopapache-sparkquery-optimizationapache-spark-sql

What affects amount of data shuffled in spark


For example Im executing some queries on spark, and in the spark UI I can see that some queries have more shuffle , and this shuffle seems that is the amount of data read locally and read between executors.

But so Im not understanding one thing, for example this query below loaded 7GB from HDFS but the suffle read + shuffled write is more than 10GB. But I saw other queries that load also 7GB from HDFS and the shuffle is like 500kb. So Im not understanding this, can you please help? The amount of data shuffled is not related in the data read from hdfs?

select 
  nation, o_year, sum(amount) as sum_profit
from 
  (
select 
  n_name as nation, year(o_orderdate) as o_year, 
  l_extendedprice * (1 - l_discount) -  ps_supplycost * l_quantity as amount
    from
      orders o join
      (select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost 
       from part p join
         (select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey, 
                 n_name, ps_supplycost 
          from partsupp ps join
            (select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey, 
                    l_orderkey, n_name 
             from
               (select s_suppkey, n_name 
                from nation n join supplier s on n.n_nationkey = s.s_nationkey
               ) s1 join lineitem l on s1.s_suppkey = l.l_suppkey
            ) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey
         ) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey
     ) l3 on o.o_orderkey = l3.l_orderkey
  )profit
group by nation, o_year
order by nation, o_year desc;

Solution

  • The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines. So its pretty clear here that shuffled data is not really dependent on the amount of the input data. However, it depends upon what operations you perform on the input data, which leads to the movement of data across executors( and hence machines). Please go through http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations to know and understand why shuffling is a costly process.

    Looking at the query you have pasted, it seems you are doing a lot of join operations (haven't looked deep to understand the ultimate operation you are doing). And that definitely calls for moving the data across partitions. The problem can be handled by revisiting the query and optimizing the same or manipulating or pre-procesing your input data in a manner which leads to less movement of data ( For ex: colocating the data which has be joined so that they fall in same partition). Again, this is just an example and you have to determine from your use case on what works best for you.