Search code examples
pythonpython-3.xpandasconcurrent.futures

Reading pandas from disk during concurrent process pool


I've wrote a cli tool to generate simulations and i'm hoping to generate about 10k (~10 minutes) for each cut of data I have ~200. I have functions that do this fine in a for loop but when I converted it to concurrent.futures.ProcessPoolExecutor() I realized that multiple processes can't read in the same pandas dataframe.

Here's the smallest example I could think of:

import concurrent.futures
import pandas as pd

def example():
    # This is a static table with basic information like distributions
    df = pd.read_parquet("batch/data/mappings.pq")
    # Then there's a bunch of etl, even reading in a few other static tables
    return sum(df.shape)

def main():
    results = []
    with concurrent.futures.ProcessPoolExecutor() as pool:
        futr_results = [pool.submit(example) for _ in range(100)]
        done_results = concurrent.futures.as_completed(futr_results)
        for _ in futr_results: 
            results.append(next(done_results).result())

    return results

if __name__ == "__main__":
    print(main())

Errors:

<jemalloc>: background thread creation failed (11)
terminate called after throwing an instance of 'std::system_error'
  what():  Resource temporarily unavailable
Traceback (most recent call last):
  File "batch/testing.py", line 19, in <module>
    main()
  File "batch/testing.py", line 14, in main
    results.append(next(done_results).result())
  File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

I'm hoping there's a quick an dirty way to read these (i'm guessing without reference?), otherwise it's looking like i'll need to create all the parameters first without getting them on the fly.


Solution

  • Three things I would try:

    • Pandas has an option for using either PyArrow or FastParquet when reading parquet files. Try using a different one - this seems to be a bug.

    • Try forcing pandas to open the file in read only mode to prevent conflicts due to the file being locked:

    pd.read_parquet(open("batch/data/mappings.pq", "rb"))
    # Also try "r" instead of "rb", not sure if pandas expects string or binary data
    
    • Try loading the file into a StringIO/BytesIO buffer, and then handing that to pandas - this avoids any interaction with the file from pandas itself:
    import io
    
    # either this (binary)
    data = io.BytesIO(open("batch/data/mappings.pq", "rb").read())
    # or this (string)
    data = io.StringIO(open("batch/data/mappings.pq", "r").read())
    
    pd.read_parquet(data)