Search code examples
pythonloopspysparklazy-evaluation

Pyspark lazy evaluation in loops too slow


First of all I want to let you know that I am still very new in spark and getting used to the lazy-evaluation concept.

Here my issue:

I have two spark DataFrames that I load from reading CSV.GZ files. What I am trying to do is to merge both tables in order to split the first table according keys that I have on the second one.

For example:

Table A

+----------+---------+--------+---------+------+
|      Date|     Zone|       X|     Type|Volume|
+----------+---------+--------+---------+------+
|2019-01-16|010010000|       B|        A|   684|
|2019-01-16|010020000|       B|        A| 21771|
|2019-01-16|010030000|       B|        A|  7497|
|2019-01-16|010040000|       B|        A| 74852|

Table B

+----+---------+
|Dept|     Zone|
+----+---------+
|  01|010010000|
|  02|010020000|
|  01|010030000|
|  02|010040000|

Then when I merge both tables I have:

+---------+----------+--------+---------+------+----+
|     Zone|      Date|       X|     Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16|       B|        A|   684|  01|
|010020000|2019-01-16|       B|        A| 21771|  02|
|010030000|2019-01-16|       B|        A|  7497|  01|
|010040000|2019-01-16|       B|        A| 74852|  02|

So what I want to do is to split this table in Y disjointed tables, where Y is the number of different 'Dept' values that I find on my merged table.

So for example:

Result1:

+---------+----------+--------+---------+------+----+
|     Zone|      Date|       X|     Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010010000|2019-01-16|       B|        A|   684|  01|
|010030000|2019-01-16|       B|        A|  7497|  01|

Result2:

+---------+----------+--------+---------+------+----+
|     Zone|      Date|       X|     Type|Volume|Dept|
+---------+----------+--------+---------+------+----+
|010020000|2019-01-16|       B|        A| 21771|  02|
|010040000|2019-01-16|       B|        A| 74852|  02|

My code looks like this:

sp_df_A = spark.read.csv(file_path_A, header=True, sep=';', encoding='cp1252')
sp_df_B = spark.read.csv(file_path_B, header=True, sep=';', encoding='cp1252')

sp_merged_df = sp_df_A.join(sp_df_B, on=['Zone'], how='left')


# list of unique 'Dept' values on the merged DataFrame
unique_buckets = [x.__getitem__('Dept') for x in sp_merged_df.select('Dept').distinct().collect()]


# Iterate over all 'Dept' found
for zone_bucket in unique_buckets:
    print(zone_bucket)
    bucket_dir = os.path.join(output_dir, 'Zone_%s' % zone_bucket)
    if not os.path.exists(bucket_dir):
        os.mkdir(bucket_dir)
    # Filter target 'Dept'
    tmp_df = sp_merged_df.filter(sp_merged_df['Dept'] == zone_bucket)
    # write result
    tmp_df.write.format('com.databricks.spark.csv').option('codec', 'org.apache.hadoop.io.compress.GzipCodec').save(bucket_dir, header = 'true')

The thing is that this very simple code is taking too much time to write a result. So my guess is that the lazy evaluation is loading, merging and filtering on every cycle of the loop.

Can this be the case?


Solution

  • Your guess is correct. Your code reads, joins and filters all the data for each of the buckets. This is indeed caused by the lazy evaluation of spark.

    Spark waits with any data transformation until an action is performed. When an action is called, spark looks at all the transformations and creates a plan on how to efficiently get the results of the action. While spark executes this plan the program holds. When spark is done the program continues and spark "forgets" about everything it has done until the next action is called.

    In your case spark "forgets" the joined dataframe sp_merged_df and each time a .collect() or .save() is called it reconstructs it.

    If you want spark to "remember" a RDD or DataFrame you can .cache() it (see docs).