I am extracting data from 3 data sources, CSV_1, CSV_2, and DB 1. CSV_1 and CSV_2 have different schemas and encodings. I am using pandas.read_csv
to read the CSVs pandas.read_sql
to pull from the DB. I am new to concurrency/parallelism but from my understanding since IO is my constraint here multithreading could help me achieve speed gains.
I thought I would be able to use concurrent.futures.ThreadPoolExecutor
and its map method to read the csvs in parallel, like so:
files = ['csv_1.csv', 'csv_2.csv']
with ThreadPoolExecutor(2) as executor:
results = executor.map(pd.read_csv, files)
but from what I've seen since I need to apply different parameters to read_csv (i.e. encoding and dtypes) this wont work. Is there a way to do use the same function with different parameters for each iterable? This part alone, reading two different csvs asynchronously that require different pd.read_csv
parameters would be a huge win.
Ideally I would also like to add a third thread that reads from the DB.
Is there a way to achieve this?
There are several ways to do it. You just need an interim function that knows how to expand the single parameter used by map
into the call you want. In the first example, you can have some canned types. This is nice because its easy to see the parameters you like for each type
def csv_reader(params):
filename, csv_type = *params
if csv_type == 'footype':
return pd.read_csv(filename, sep="|")
elif csv_type == 'bartype':
return pd.read_csv(filename, columns=["A", "B", "C"])
files = [('csv_1.csv', 'footype'), ('csv_2.csv', 'bartype')]
with ThreadPoolExecutor(2) as executor:
results = executor.map(csv_reader, files)
But you could always make it generic
def csv_reader_generic(params):
filename, args, kw = *params
return pd.read_csv(filename, *args, **kwargs)
files = [('csv_1.csv', tuple(), {"sep":"|"}),
('csv_2.csv', tuple(), {"columns":["A", "B", "C"]})]
with ThreadPoolExecutor(2) as executor:
results = executor.map(csv_reader_generic, files)