Search code examples
pythondatabricksaws-databricksdbutils

Databricks parallelize file system operations


This question applies broadly but the specific use case is concatenating all fragmented files from a number of directories.

The crux of the question is optimizing/inspecting parallelism with with how Databricks performs file system operations.

Notes:

  • Cluster has 128 cores for the driver. 1 worker with 8. Rationale that file system operations don't run on executors so that can be throttled.

  • All files in an external s3 bucket, not dbfs.

    fragmented_files/
        entity_1/
            fragment_1.csv
            ...
            fragment_n.csv
        ...
        entity_n/
            fragment_1.csv
            ...
            fragment_n.csv

    merged_files/
        entity_1/
            merged.csv
        ...
        entity_n/
            merged.csv

I have working code, the gist of which is

def concat_files(fragged_dir, new):
    with open(new) as nout:
        for orig_frag_file in fragged_dir:
            with open(orig_frag_file) as o_file:
                nout.write(o_file)

with concurrent.futures.ThreadPoolExecutor() as executor:
  results = executor.map(concat_files, all_directories_with_fragmented_files)

Questions:

  • For file system operations, or anything that does not give a SparkUI display, how can I verify that I'm actually using all the driver cores? Rather than just queueing everything up to run on 1.

  • How would ThreadPoolExecutor vs. ProcessPoolExecutor vary here?

  • Is there an advantage to using the dbutils api vs. regular python?


Solution

  • By using ThreadPoolExecutor, the threads, will run in the same process and share the same memory space. In some cases, due to the Global Interpreter Lock (GIL) in CPython, this might not result in as much parallelism as using separate processes with ProcessPoolExecutor, especially for CPU-bound tasks.

    Fibonacci calculation (ThreadPoolExecutor & ProcessPoolExecutor)

    import concurrent.futures
    import numpy as np
    import timeit
    
    # Define a Fibonacci calculation function
    def fib(n):
        return n if n <= 1 else fib(n - 1) + fib(n - 2)
    
    if __name__ == "__main__":
        n_values = np.arange(30, 41)  # Calculate Fibonacci numbers for a range of values
    
        # Using ProcessPoolExecutor
        setup_code = '''
    from __main__ import fib
    from concurrent.futures import ProcessPoolExecutor
    import numpy as np
    n_values = np.arange(30, 41)
        '''
    
        execution_code = '''
    with ProcessPoolExecutor() as executor:
        results = list(executor.map(fib, n_values))
        '''
    
        time_taken = timeit.timeit(execution_code, setup=setup_code, number=1)
        print(f"ProcessPoolExecutor took {time_taken} seconds")
    
    # ProcessPoolExecutor took 28.703153874994314 seconds
    
    import concurrent.futures
    import numpy as np
    import timeit
    
    # Define a Fibonacci calculation function
    def fib(n):
        return n if n <= 1 else fib(n - 1) + fib(n - 2)
    
    if __name__ == "__main__":
        n_values = np.arange(30, 41)  # Calculate Fibonacci numbers for a range of values
    
        # Using ThreadPoolExecutor
        setup_code = '''
    from __main__ import fib
    from concurrent.futures import ThreadPoolExecutor
    import numpy as np
    n_values = np.arange(30, 41)
        '''
    
        execution_code = '''
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(fib, n_values))
        '''
    
        time_taken = timeit.timeit(execution_code, setup=setup_code, number=1)
        print(f"ThreadPoolExecutor took {time_taken} seconds")
    
    # ThreadPoolExecutor took 65.74527391699667 seconds
    

    In your specific case of concatenating fragmented files from external S3 storage, the Databricks dbutils API doesn't offer significant advantages. Since you're dealing with file system operations on external storage (depends the type of drive format you are using, but in this case, it's s3 buckets), regular Python provides portability and flexibility and can be used both within and outside the Databricks environment. It's well-suited for your task, and you can achieve parallelism using ThreadPoolExecutor (ProcessToolExecutor is recommended for efficiency) as you've demonstrated.