Search code examples
pythonapache-sparkdistributed-computing

Spark broadcast vs join


I have a large RDD (rdd_1) and a filtered subset (rdd_2) of it. I want to join rdd_1 and rdd_2 on a different field.

Let's say records are of format {'first_name':<>, 'last_name':<>}. We want to find all names that have the same last name as all the 'jack's.

names = sc.textfile(RAW_DATA)
jack = names.filter(lambda v: v['first_name'] == 'jack')

Option 1

jack_last_names = jack.map(operator.itergetter('last_name').distinct().collect()
last_names_bc = sc.broadcast(set(jack_last_names))
final = names.filter(lambda v:v['last_name'] in last_names_bc.value)

Currently, I broadcast rdd_2 and filter rdd_1 by it. Trouble is that in order to broadcast rdd_2, I have to first collect() it on the driver and it causes driver to run out of memory.

Is there a way to broadcast an RDD without first collect()ing it on the driver?

Option 2

final = jack.keyBy(operator.itemgetter('last_name').join(names.keyBy(operator.itemgetter('last_name')

My other option is rdd_1.join(rdd_2) but rdd_1 is way too big to shuffle.

When we run rdd_1.join(rdd_2) do both rdd_1 and rdd_2 get hash partitioned and shuffled?

Thanks!


Solution

  • Is there a way to broadcast an RDD without first collect()ing it on the driver?

    No, there isn't and even if there was it wouldn't solve your problem.

    • it is not possible to execute nested action or transformation
    • if you could create a local broadcast variable without collection you face the same problem but on the workers

    When we run rdd_1.join(rdd_2) do both rdd_1 and rdd_2 get hash partitioned and shuffled?

    Technically in PySpark it would require union followed by groupByKey so it means that all data has to be shuffled.

    In practice I would simply accept the cost of shuffling an move on. In general it is not possible to write any complex application and completely avoid shuffling. Moreover it is not more expensive than broadcasting a similar amount of data or even copying data to distributed file system with replication.