Search code examples
pythonmultithreadingasynchronoustornadocx-oracle

Tornado background task using cx_Oracle


I'm runnning a Bokeh server, using the underlying Tornado framework.

I need the server to refresh some data at some point. This is done by fetching rows from an Oracle DB, using Cx_Oracle.

Thanks to Tornado's PeriodicCallback, the program checks every 30 seconds if new data should be loaded:

server.start() 
from tornado.ioloop import PeriodicCallback
pcallback = PeriodicCallback(db_obj.reload_data_async, 10 * 1e3)
pcallback.start()
server.io_loop.start()

Where db_obj is an instance of a class which takes care of the DB related functions (connect, fetch, ...).

Basically, this is how the reload_data_async function looks like:

executor = concurrent.futures.ThreadPoolExecutor(4)

# methods of the db_obj class ...

@gen.coroutine
def reload_data_async(self):
  # ... first, some code to check if the data should be reloaded ...
  # ...
  if data_should_be_reloaded:
    new_data = yield executor.submit(self.fetch_data)

def fetch_data(self):
   """ fetch new data in the DB """
   cursor = cx.Cursor(self.db_connection) 
   cursor.execute("some SQL select request that takes time (select * from ...)")

   rows = cursor.fetchall()
   # some more processing thereafter 
   # ...

Basically, this works. But when I try to read the data while it's being load in fetch_data (by clicking for display in the GUI), the program crashes due to race condition (I guess?): it's accessing the data while it's being fetched at the same time.

I just discovered that tornado.concurrent.futures are not thread-safe:

tornado.concurrent.Future is similar to concurrent.futures.Future, but not thread-safe (and therefore faster for use with single-threaded event loops).

All in all, I think I should create a new thread to take care of the CX_Oracle operations. Can I do that using Tornado and keep using the PerodicCallback function? How can I convert my asynchronous operation to be thread-safe? What's the way to do this?

PS: Im using Python 2.7

Thanks


Solution

  • Solved it!

    @Sraw is right: it should not cause crash.

    Explanation: fetch_data() is using a cx Oracle Connection object (self.db_connection), which is NOT thread-safe by default. Setting the threaded parameter to True wraps the shared connection with a mutex, as described in Cx Oracle documentation:

    The threaded parameter is expected to be a boolean expression which indicates whether or not Oracle should wrap accesses to connections with a mutex. Doing so in single threaded applications imposes a performance penalty of about 10-15% which is why the default is False.

    So I in my code, I just modified the following and it now works without crashing when the user tries to access data while it's being refreshed:

    # inside the connect method of the db_obj class
    self.db_connection = cx.connect('connection string', threaded=True) # False by default