So I have a python script that is going to read a million or so rows from a csv file. With the data from each row, I am going to make at most two and at least one sql query to a table in postgres. I would like to introduce some parallelism to speed up the process. I'm not yet an expert in that regard so I would like to know when to use multi-threading, multi-processing or async programming in python. Here's the current synchronous code below:
def distribute_rows_to_files(file_path: str) -> None:
exists_file = "exists_data.csv"
c_not_exists_file = "c_not_exists_data.csv"
i_not_exists_file = "i_not_exists_data.csv"
exception_file = "exceptions.csv"
# Open the files before the loop to reduce overhead
exists_file_handle = open(exists_file, mode='a', newline='')
c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
exception_handle = open(exception_file, mode='a', newline='')
with psycopg2.connect(**AUTHENTICATOR_PG_DB_CREDENTIALS) as conn:
with open(file_path, mode='r', newline='') as file:
reader = csv.reader(file)
next(reader) # Skip the header line
count = 0
for row in reader:
count += 1
if count == 100:
break
# Process each row here
i_code, t_id, __, ___, ____ = row
try:
cur = conn.cursor()
query = """
SELECT customer_id
FROM buckets
WHERE i_code = %(i_code)s
LIMIT 1
"""
cur.execute(query, {"i_code": i_code})
result = cur.fetchone()
cur.close()
if result:
try:
cur = conn.cursor()
second_query = """
SELECT EXISTS (
SELECT 1
FROM customers
WHERE customer_id = %(customer_id)s
AND t_id = %(t_id)s
)
"""
cur.execute(second_query, {"customer_id": result[0], "toe_id": toe_id})
exists = cur.fetchone()[0]
cur.close()
file_handle = exists_file_handle if exists else c_not_exists_file_handle
writer = csv.writer(file_handle)
writer.writerow(row)
except Exception as e:
row_with_exception = row + (str(e),)
writer = csv.writer(exception_handle)
writer.writerow(row_with_exception)
else:
writer = csv.writer(i_not_exists_file_handle)
writer.writerow(row)
except Exception as e:
row_with_exception = row + (str(e),)
writer = csv.writer(exception_handle)
writer.writerow(row_with_exception)
exists_file_handle.close()
c_not_exists_file_handle.close()
i_not_exists_file_handle.close()
exception_handle.close()
If I could get a reason for whatever approach to take and what the code would look like, that would be great! I've done some reading that suggests asyncpg
over psycopg2
for async work. But that's assuming the async programming route is the approach to take
So I did some studying and you typically use async for I/O and blocking operations and multi-threading for more CPU intensive operations? Here's the way to implement the above with asyncpg:
import csv
import asyncio
import asyncpg
async def process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle):
i_code, _, __, ___, ____ = row
try:
async with pool.acquire() as conn:
result = await conn.fetchval("""
SELECT customer_id
FROM buckets
WHERE i_code = $1
LIMIT 1
""", int(i_code))
if result:
exists = await conn.fetchval("""
SELECT EXISTS (
SELECT 1
FROM customers
WHERE customer_id = $1
)
""", result)
file_handle = exists_file_handle if exists else c_not_exists_file_handle
writer = csv.writer(file_handle)
writer.writerow(row)
else:
writer = csv.writer(i_not_exists_file_handle)
writer.writerow(row)
except Exception as e:
row.append(f"exception -- {e}")
writer = csv.writer(exception_handle)
writer.writerow(row)
print(f"exception -- {e}")
async def distribute_rows_to_files(file_path: str) -> None:
exists_file = "exists_data.csv"
c_not_exists_file = "c_not_exists_data.csv"
ica_not_exists_file = "i_not_exists_data.csv"
exception_file = "exceptions.csv"
pool = await asyncpg.create_pool(**AUTHENTICATOR_PG_DB_CREDENTIALS)
exists_file_handle = open(exists_file, mode='a', newline='')
c_not_exists_file_handle = open(c_not_exists_file, mode='a', newline='')
i_not_exists_file_handle = open(i_not_exists_file, mode='a', newline='')
exception_handle = open(exception_file, mode='a', newline='')
with open(file_path, mode='r', newline='') as file:
reader = csv.reader(file)
next(reader) # Skip the header line
tasks = []
count = 0
for row in reader:
count += 1
if count == 100:
break
task = process_row(row, pool, exists_file_handle, c_not_exists_file_handle, i_not_exists_file_handle, exception_handle)
tasks.append(task)
await asyncio.gather(*tasks)
for file_handle in (exists_file_handle, c_not_exists_file_handle, ica_not_exists_file_handle, exception_handle):
if file_handle:
file_handle.close()
await pool.close()
csv_file = "CS_MemberMapping_all_added.csv"
asyncio.run(distribute_rows_to_files(csv_file))