I've wrote a cli tool to generate simulations and i'm hoping to generate about 10k (~10 minutes) for each cut of data I have ~200. I have functions that do this fine in a for loop but when I converted it to concurrent.futures.ProcessPoolExecutor()
I realized that multiple processes can't read in the same pandas dataframe.
Here's the smallest example I could think of:
import concurrent.futures
import pandas as pd
def example():
# This is a static table with basic information like distributions
df = pd.read_parquet("batch/data/mappings.pq")
# Then there's a bunch of etl, even reading in a few other static tables
return sum(df.shape)
def main():
results = []
with concurrent.futures.ProcessPoolExecutor() as pool:
futr_results = [pool.submit(example) for _ in range(100)]
done_results = concurrent.futures.as_completed(futr_results)
for _ in futr_results:
results.append(next(done_results).result())
return results
if __name__ == "__main__":
print(main())
Errors:
<jemalloc>: background thread creation failed (11)
terminate called after throwing an instance of 'std::system_error'
what(): Resource temporarily unavailable
Traceback (most recent call last):
File "batch/testing.py", line 19, in <module>
main()
File "batch/testing.py", line 14, in main
results.append(next(done_results).result())
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
I'm hoping there's a quick an dirty way to read these (i'm guessing without reference?), otherwise it's looking like i'll need to create all the parameters first without getting them on the fly.
Three things I would try:
Pandas has an option for using either PyArrow or FastParquet when reading parquet files. Try using a different one - this seems to be a bug.
Try forcing pandas to open the file in read only mode to prevent conflicts due to the file being locked:
pd.read_parquet(open("batch/data/mappings.pq", "rb"))
# Also try "r" instead of "rb", not sure if pandas expects string or binary data
import io
# either this (binary)
data = io.BytesIO(open("batch/data/mappings.pq", "rb").read())
# or this (string)
data = io.StringIO(open("batch/data/mappings.pq", "r").read())
pd.read_parquet(data)