I have a large file, with a JSON record on each line. I'm writing a script to upload a subset of these records to CouchDB via the API, and experimenting with different approaches to see what works the fastest. Here's what I've found to work fastest to slowest (on a CouchDB instance on my localhost):
Read each needed record into memory. After all records are in memory, generate an upload coroutine for each record, and gather/run all the coroutines at once
Synchronously read file and when a needed record is encountered, synchronously upload
Use aiofiles
to read the file, and when a needed record is encountered, asynchronously update
Approach #1 is much faster than the other two (about twice as fast). I am confused why approach #2 is faster than #3, especially in contrast to this example here, which takes half as much time to run asynchronously than synchronously (sync code not provided, had to rewrite it myself). Is it the context switching from file i/o to HTTP i/o, especially with file reads ocurring much more often than API uploads?
For additional illustration, here's some Python pseudo-code that represents each approach:
import json
import asyncio
import aiohttp
records = []
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
records.append(record)
async def batch_upload(records):
async with aiohttp.ClientSession() as session:
tasks = []
for record in records:
task = async_upload(record, session)
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(batch_upload(properties))
import json
with open('records.txt', 'r') as record_file:
for line in record_file:
record = json.loads(line)
if valid(record):
sync_upload(record)
import json
import asyncio
import aiohttp
import aiofiles
async def batch_upload()
async with aiohttp.ClientSession() as session:
async with open('records.txt', 'r') as record_file:
line = await record_file.readline()
while line:
record = json.loads(line)
if valid(record):
await async_upload(record, session)
line = await record_file.readline()
asyncio.run(batch_upload())
The file I'm developing this with is about 1.3 GB, with 100000 records total, 691 of which I upload. Each upload begins with a GET request to see if the record already exists in CouchDB. If it does, then a PUT is performed to update the CouchDB record with any new information; if it doesn't, then a the record is POSTed to the db. So, each upload consists of two API requests. For dev purposes, I'm only creating records, so I run the GET and POST requests, 1382 API calls total.
Approach #1 takes about 17 seconds, approach #2 takes about 33 seconds, and approach #3 takes about 42 seconds.
your code uses async but it does the work synchronously and in this case it will be slower than the sync approach. Asyc won't speed up the execution if not constructed/used effectively.
You can create 2 coroutines and make them run in parallel.. perhaps that speeds up the operation.
Example:
#!/usr/bin/env python3
import asyncio
async def upload(event, queue):
# This logic is not so correct when it comes to shutdown,
# but gives the idea
while not event.is_set():
record = await queue.get()
print(f'uploading record : {record}')
return
async def read(event, queue):
# dummy logic : instead read here and populate the queue.
for i in range(1, 10):
await queue.put(i)
# Initiate shutdown..
event.set()
async def main():
event = asyncio.Event()
queue = asyncio.Queue()
uploader = asyncio.create_task(upload(event, queue))
reader = asyncio.create_task(read(event, queue))
tasks = [uploader, reader]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())