Search code examples
apache-sparkfor-looppysparkparallel-processingamazon-emr

Will this method force parallelization of "for" loops in spark?


I have a nested for loop that does 10 iterations on 4 time windows. The pseudocode is something like this:

df=spark.read.parquet("very large dataset")
for i in range(1,5):
    time_window= 5 * i
    for j in range(0,10):
        df_out=[operations performed in 10 different slices of the time window]
        df_out.write.parquet("output path")

I know that each loop will run sequentially and therefore the parallel processing in spark is limited only to each inner loop. So for the 40 total iterations defined in the loop (4 x 10), spark distributed computing will occur in the operations defined in each inner loop only.

I was wondering if I modified the code such that I manually specify the outerloop 4 times, like so:

df=spark.read.parquet("very large dataset")\
.persist(StorageLevel.MEMORY_AND_DISK)

time_window= 5 * 1
for j in range(0,10):
    df_out1=[operations performed in 10 different slices of the time window]
    df_out1.write.parquet("output path")
    
time_window= 5 * 2
for j in range(0,10):
    df_out2=[operations performed in 10 different slices of the time window]
    df_out2.write.parquet("output path")
    
time_window= 5 * 3
for j in range(0,10):
    df_out3=[operations performed in 10 different slices of the time window]
    df_out3.write.parquet("output path")

time_window= 5 * 4
for j in range(0,10):
    df_out4=[operations performed in 10 different slices of the time window]
    df_out4.write.parquet("output path")    

Since the output of each outer loop is independent of it preceding loop, the operations should be able to run parallel. So instead of running 40 individual loops sequentially (1 at a time), will Spark run 4 at a time given the code modification?


Solution

  • You can use Python multiprocessing ThreadPool. Something like this with your pseudocode :

    from multiprocessing.pool import ThreadPool
    
    # define a function which takes the original DF and a time_window arguments
    def do_something(df, time_window):
        for j in range(0, 10):
            df_out = _ #operations performed in 10 different slices of the time window]
            df_out.write.parquet("output path")
    
    # run a pool of 4 processes
    with ThreadPool(4) as p:
        args = [(df, 5 * i) for i in range(1, 5)]
        p.starmap(do_something, args)