Search code examples
pythonpandasmultithreadingpython-multithreadingconcurrent.futures

How to read from multiple different sources concurrently?


I am extracting data from 3 data sources, CSV_1, CSV_2, and DB 1. CSV_1 and CSV_2 have different schemas and encodings. I am using pandas.read_csv to read the CSVs pandas.read_sql to pull from the DB. I am new to concurrency/parallelism but from my understanding since IO is my constraint here multithreading could help me achieve speed gains.

I thought I would be able to use concurrent.futures.ThreadPoolExecutor and its map method to read the csvs in parallel, like so:

files = ['csv_1.csv', 'csv_2.csv']

with ThreadPoolExecutor(2) as executor:
    results = executor.map(pd.read_csv, files)

but from what I've seen since I need to apply different parameters to read_csv (i.e. encoding and dtypes) this wont work. Is there a way to do use the same function with different parameters for each iterable? This part alone, reading two different csvs asynchronously that require different pd.read_csv parameters would be a huge win.

Ideally I would also like to add a third thread that reads from the DB.

Is there a way to achieve this?


Solution

  • There are several ways to do it. You just need an interim function that knows how to expand the single parameter used by map into the call you want. In the first example, you can have some canned types. This is nice because its easy to see the parameters you like for each type

    def csv_reader(params):
        filename, csv_type = *params
        if csv_type == 'footype':
            return pd.read_csv(filename, sep="|")
        elif csv_type == 'bartype':
            return pd.read_csv(filename, columns=["A", "B", "C"])
    
    files = [('csv_1.csv', 'footype'), ('csv_2.csv', 'bartype')]
    
    with ThreadPoolExecutor(2) as executor:
        results = executor.map(csv_reader, files)
    

    But you could always make it generic

    def csv_reader_generic(params):
        filename, args, kw = *params
        return pd.read_csv(filename, *args, **kwargs)
    
    files = [('csv_1.csv', tuple(), {"sep":"|"}), 
        ('csv_2.csv', tuple(), {"columns":["A", "B", "C"]})]
    
    with ThreadPoolExecutor(2) as executor:
        results = executor.map(csv_reader_generic, files)