Search code examples
pythonmultithreadingpyramid

Pyramid: Multi-threaded Database Operation


My application receives one or more URLs (typically 3-4 URLs) from the user, scrapes certain data from those URLs and writes those data to the database. However, because scraping those data take a little while, I was thinking of running each of those scraping in a separate thread so that the scraping + writing to the database can keep on going in the background so that the user does not have to keep on waiting.

To implement that, I have (relevant parts only):

@view_config(route_name="add_movie", renderer="templates/add_movie.jinja2")
def add_movie(request):
    post_data = request.POST

    if "movies" in post_data:
        movies = post_data["movies"].split(os.linesep)

        for movie_id in movies:        
            movie_thread = Thread(target=store_movie_details, args=(movie_id,))
            movie_thread.start()

    return {}

def store_movie_details(movie_id):

    movie_details = scrape_data(movie_id)
    new_movie = Movie(**movie_details) # Movie is my model.

    print new_movie  # Works fine.

    print DBSession.add(movies(**movie_details))  # Returns None.

While the line new_movie does print correct scrapped data, DBSession.add() doesn't work. In fact, it just returns None.

If I remove the threads and just call the method store_movie_details(), it works fine.

What's going on?


Solution

  • Firstly, the SA docs on Session.add() do not mention anything about the method's return value, so I would assume it is expected to return None.

    Secondly, I think you meant to add new_movie to the session, not movies(**movie_details), whatever that is :)

    Thirdly, the standard Pyramid session (the one configured with ZopeTransactionExtension) is tied to Pyramid's request-response cycle, which may produce unexpected behavior in your situation. You need to configure a separate session which you will need to commit manually in store_movie_details. This session needs to use scoped_session so the session object is thread-local and is not shared across threads.

    from sqlalchemy.orm import scoped_session
    from sqlalchemy.orm import sessionmaker
    
    session_factory = sessionmaker(bind=some_engine)
    AsyncSession = scoped_session(session_factory)
    
    def store_movie_details(movie_id):
    
        session = AsyncSession()
        movie_details = scrape_data(movie_id)
        new_movie = Movie(**movie_details) # Movie is my model.
    
        session.add(new_movie)
        session.commit()
    

    And, of course, this approach is only suitable for very light-weight tasks, and if you don't mind occasionally losing a task (when the webserver restarts, for example). For anything more serious have a look at Celery etc. as Antoine Leclair suggests.