Search code examples
pythonpostgresqlparallel-processingpython-asyncio

Async Programming vs Multi-threading for multiple reads to postgres


So I have a python script that is going to read a million or so rows from a csv file. With the data from each row, I am going to make at most two and at least one sql query to a table in postgres. I would like to introduce some parallelism to speed up the process. I'm not yet an expert in that regard so I would like to know when to use multi-threading, multi-processing or async programming in python. Here's the current synchronous code below:

def distribute_rows_to_files(file_path: str) -> None:
    exists_file = "exists_data.csv"
    c_not_exists_file = "c_not_exists_data.csv"
    i_not_exists_file = "i_not_exists_data.csv"
    exception_file = "exceptions.csv"


    # Open the files before the loop to reduce overhead
    exists_file_handle = open(exists_file, mode='a', newline='')
    c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
    i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
    exception_handle = open(exception_file, mode='a', newline='')


    with psycopg2.connect(**AUTHENTICATOR_PG_DB_CREDENTIALS) as conn:
        with open(file_path, mode='r', newline='') as file:
            reader = csv.reader(file)
            next(reader)  # Skip the header line


            count = 0
            for row in reader:
                count += 1
                if count == 100:
                    break


                # Process each row here
                i_code, t_id, __, ___, ____ = row
                try:
                    cur = conn.cursor()
                    query = """
                        SELECT customer_id
                        FROM buckets
                        WHERE i_code = %(i_code)s
                        LIMIT 1
                    """
                    cur.execute(query, {"i_code": i_code})
                    result = cur.fetchone()
                    cur.close()


                    if result:
                        try:
                            cur = conn.cursor()
                            second_query = """
                                SELECT EXISTS (
                                    SELECT 1
                                    FROM customers
                                    WHERE customer_id = %(customer_id)s
                                    AND t_id = %(t_id)s
                                )
                            """
                            cur.execute(second_query, {"customer_id": result[0], "toe_id": toe_id})
                            exists = cur.fetchone()[0]
                            cur.close()


                            file_handle = exists_file_handle if exists else c_not_exists_file_handle
                            writer = csv.writer(file_handle)
                            writer.writerow(row)
                        except Exception as e:
                            row_with_exception = row + (str(e),)
                            writer = csv.writer(exception_handle)
                            writer.writerow(row_with_exception)
                    else:
                        writer = csv.writer(i_not_exists_file_handle)
                        writer.writerow(row)


                except Exception as e:
                    row_with_exception = row + (str(e),)
                    writer = csv.writer(exception_handle)
                    writer.writerow(row_with_exception)


    exists_file_handle.close()
    c_not_exists_file_handle.close()
    i_not_exists_file_handle.close()
    exception_handle.close()

If I could get a reason for whatever approach to take and what the code would look like, that would be great! I've done some reading that suggests asyncpg over psycopg2 for async work. But that's assuming the async programming route is the approach to take


Solution

  • So I did some studying and you typically use async for I/O and blocking operations and multi-threading for more CPU intensive operations? Here's the way to implement the above with asyncpg:

    import csv
    import asyncio
    import asyncpg
    
    
    
    async def process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle):
        i_code, _, __, ___, ____ = row
        try:
            async with pool.acquire() as conn:
                result = await conn.fetchval("""
                    SELECT customer_id
                    FROM buckets
                    WHERE i_code = $1
                    LIMIT 1
                """, int(i_code))
    
                if result:
                    exists = await conn.fetchval("""
                        SELECT EXISTS (
                            SELECT 1
                            FROM customers
                            WHERE customer_id = $1
                        )
                    """, result)
    
                    file_handle = exists_file_handle if exists else c_not_exists_file_handle
                    writer = csv.writer(file_handle)
                    writer.writerow(row)
                else:
                    writer = csv.writer(i_not_exists_file_handle)
                    writer.writerow(row)
        except Exception as e:
            row.append(f"exception -- {e}")
            writer = csv.writer(exception_handle)
            writer.writerow(row)
            print(f"exception -- {e}")
    
    async def distribute_rows_to_files(file_path: str) -> None:
        exists_file = "exists_data.csv"
        c_not_exists_file = "c_not_exists_data.csv"
        ica_not_exists_file = "i_not_exists_data.csv"
        exception_file = "exceptions.csv"
    
        pool = await asyncpg.create_pool(**AUTHENTICATOR_PG_DB_CREDENTIALS)
    
        exists_file_handle = open(exists_file, mode='a', newline='')
        c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
        i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
        exception_handle = open(exception_file, mode='a', newline='')
    
        with open(file_path, mode='r', newline='') as file:
            reader = csv.reader(file)
            next(reader)  # Skip the header line
    
            tasks = []
            count = 0
            for row in reader:
                count += 1
                if count == 100:
                    break
                task = process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle)
                tasks.append(task)
    
            await asyncio.gather(*tasks)
    
        for file_handle in (exists_file_handle, c_not_exists_file_handle, ica_not_exists_file_handle, exception_handle):
            if file_handle:
                file_handle.close()
    
        await pool.close()
    
    csv_file = "CS_MemberMapping_all_added.csv"
    asyncio.run(distribute_rows_to_files(csv_file))