Search code examples
python-3.xpysparkapache-spark-sqlparquetpyarrow

How to perform parallel computation on Spark Dataframe by row?


I have a collection of 300 000 points and I would like to compute the distance between them.

    id      x    y
0   0       1    0
1   1       28   76
…

Thus I do a Cartesian product between those points and I filter such as I keep only one combination of points. Indeed for my purpose distance between points (0, 1) is same as (1,0)

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import math


@udf(returnType=IntegerType())
def compute_distance(x1,y1, x2,y2):
    return math.square(math.pow(x1-x2) + math.pow(y1-y2))


columns = ['id','x', 'y']
data = [(0, 1, 0), (1, 28,76), (2, 33,42)]
spark = SparkSession\
            .builder \
            .appName('distance computation') \
            .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
            .config('spark.executor.memory', '2g') \
            .master('local[20]') \
            .getOrCreate()
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
result = df.alias('a')\
               .join(df.alias('b'),
                     F.array(*['a.id']) < F.array(*['b.id']))\
               .withColumn('distance', compute_distance(F.col('a.x'), F.col('a.y'), F.col('b.x'), F.col('b.y')))

result.write.parquet('distance-between-points')

While that seems to work, the CPU usage for my latest task (parquet at NativeMethodAccessorImpl.java:0) did not go above 100%. Also, it took and a day to complete.

I would like to know if the withColumn operation is performed on multiple executors in order to achieve parallelism?

Is there a way to split the data in order to compute distance by batch and to store the result in one or multiple Parquet files?

Thanks for your insight.


Solution

  • I would like to know if the withColumn operation is performed on multiple executor in order to achieve parallelism ?

    Yes, assuming a correctly configured cluster, the dataframe will be partitioned across your cluster and the executors will work through the partitions in parallel running your UDF.

    Is there a way to split the data in order to compute distance by batch in // and to store them into one or multiples parquet files ?

    By default, the resulting dataframe will be partitioned across the cluster and written out as one Parquet file per partition. You can change that by re-partioning if required, but that will result in a shuffle and take longer.

    I recommend the 'Level of Parallelism' section in the Learning Spark book for further reading.