Search code examples
pythonpandaspysparkdaskspark-koalas

Pandas parallel apply with koalas (pyspark)


I'm new to Koalas (pyspark), and I was trying to utilize Koalas for parallel apply, but it seemed like it was using a single core for the whole operation (correct me if I'm wrong) and ended up using dask for parallel apply (using map_partition) which worked pretty well.

However, I would like to know if there's a way to utilize Koalas for parallel apply.

I used basic codes for operation like below.

import pandas as pd
import databricks.koalas as ks

my_big_data = ks.read_parquet('my_big_file') # file is single partitioned parquet file

my_big_data['new_column'] = my_big_data['string_column'].apply(my_prep) # my_prep does stirng operations


my_big_data.to_parquet('my_big_file_modified') # for Koalas does lazy evaluation


Solution

  • I found a link that discuss this problem. https://github.com/databricks/koalas/issues/1280

    If the number of rows that are being applied by function is less than 1,000 (default value), then pandas dataframe will be called to do the operation.

    The user defined function above my_prep is applied to each row, so single core pandas was being used.

    In order to force it to work in pyspark (parallel) manner, user should modify the configuration as below.

    import databricks.koalas as ks
    ks.set_option('compute.default_index_type','distributed') # when .head() call is too slow
    ks.set_option('compute.shortcut_limit',1) # Koalas will apply pyspark 
    

    Also, explicitly specifying type (type hint) in the user defined function will make Koalas not to go shortcut path and will make parallel.

    def my_prep(row) -> string:
      return row
    
    kdf['my_column'].apply(my_prep)