Search code examples
pythonsqlitesqlalchemyconcurrent.futures

Concurrent.futures and SQLAlchemy benchmarks vs. synchronous code


I have a project where i need to upload ~70 files to my flask app. I'm learning concurrency right now so this seems like perfect practice. When using print statements, the concurrent version of this function is about 2x to 2.5x faster than the synchronous function.

Though when actually writing to the SQLite database, it takes about the same amount of time.

Original func:

@app.route('/test_sync')
def auto_add():

    t0 = time.time()

    # Code does not work without changing directory. better option?
    os.chdir('my_app/static/tracks')

    list_dir = os.listdir('my_app/static/tracks')

    # list_dir consists of .mp3 and .jpg files
    for filename in list_dir:
        if filename.endswith('.mp3'):
            try:
                thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
            except Exception:
                print(f'ERROR - COULD NOT FIND THUMB for { filename }')

            resize_image(thumbnail)

            with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:

                track = Track(

                    title=filename[15:-4], 
                    artist='Sam Gellaitry',
                    description='No desc.', 
                    thumbnail=t.read(),
                    binary_audio=f.read()
                )

        else:
            continue


        db.session.add(track)

    db.session.commit()
    elapsed = time.time() - t0

    return f'Uploaded all tracks in {elapsed} seconds.'

Concurrent func(s):

@app.route('/test_concurrent')
def auto_add_concurrent():

    t0 = time.time()
    MAX_WORKERS = 40

    os.chdir('/my_app/static/tracks')
    list_dir = os.listdir('/my_app/static/tracks')
    mp3_list = [x for x in list_dir if x.endswith('.mp3')]

    with futures.ThreadPoolExecutor(MAX_WORKERS) as executor:
        res = executor.map(add_one_file, mp3_list)

    for x in res:
        db.session.add(x)

    db.session.commit()
    elapsed = time.time() - t0

    return f'Uploaded all tracks in {elapsed} seconds.'

----- 

def add_one_file(filename):

    list_dir = os.listdir('/my_app/static/tracks')

    try:
        thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]

    except Exception:
        print(f'ERROR - COULD NOT FIND THUMB for { filename }')

    resize_image(thumbnail)

    with open(filename, 'rb') as f, open(thumbnail, 'rb') as t:

        track = Track(

            title=filename[15:-4], 
            artist='Sam Gellaitry',
            description='No desc.', 
            thumbnail=t.read(),
            binary_audio=f.read()
        )

    return track

Heres the resize_image func for completeness

def resize_image(thumbnail):

    with Image.open(thumbnail) as img:
        img.resize((500, 500))
        img.save(thumbnail)

    return thumbnail

And benchmarks:

/test_concurrent (with print statements)
Uploaded all tracks in 0.7054300308227539 seconds.

/test_sync
Uploaded all tracks in 1.8661110401153564 seconds.

------
/test_concurrent (with db.session.add/db.session.commit)
Uploaded all tracks in 5.303245782852173 seconds.

/test_sync 
Uploaded all tracks in 6.123792886734009 seconds.

What am i doing wrong with this concurrent code, and how can I optimize it?


Solution

  • It seems that the DB writes dominate your timings, and they do not usually benefit from parallelization when writing many rows to the same table, or in case of SQLite the same DB. Instead of adding the ORM objects 1 by 1 to the session, perform a bulk insert:

    db.session.bulk_save_objects(list(res))
    

    In your current code the ORM has to insert the Track objects one at a time during flush just before the commit in order to fetch their primary keys after insert. Session.bulk_save_objects does not do that by default, which means that the objects are less usable after – they're not added to the session for example – but that does not seem to be an issue in your case.

    "I’m inserting 400,000 rows with the ORM and it’s really slow!" is a good read on the subject.


    As a side note, when working with files it is best to try and avoid any TOCTOU situations, if possible. In other words don't use

    thumbnail = [thumb for thumb in list_dir if thumb == filename[:-4] + '.jpg'][0]
    

    to check if the file exists, use os.path.isfile() or such instead if you must, but you should just try and open it and then handle the error, if it cannot be opened:

    thumbnail = filename[:-4] + '.jpg'
    
    try:
        resize_image(thumbnail)
    
    except FileNotFoundError:
        print(f'ERROR - COULD NOT FIND THUMB for { filename }')
        # Note that the latter open attempt will fail as well, if this fails
    
    ...