I'm runnning a Bokeh server, using the underlying Tornado framework.
I need the server to refresh some data at some point. This is done by fetching rows from an Oracle DB, using Cx_Oracle.
Thanks to Tornado's PeriodicCallback, the program checks every 30 seconds if new data should be loaded:
server.start()
from tornado.ioloop import PeriodicCallback
pcallback = PeriodicCallback(db_obj.reload_data_async, 10 * 1e3)
pcallback.start()
server.io_loop.start()
Where db_obj
is an instance of a class which takes care of the DB related functions (connect, fetch, ...).
Basically, this is how the reload_data_async
function looks like:
executor = concurrent.futures.ThreadPoolExecutor(4)
# methods of the db_obj class ...
@gen.coroutine
def reload_data_async(self):
# ... first, some code to check if the data should be reloaded ...
# ...
if data_should_be_reloaded:
new_data = yield executor.submit(self.fetch_data)
def fetch_data(self):
""" fetch new data in the DB """
cursor = cx.Cursor(self.db_connection)
cursor.execute("some SQL select request that takes time (select * from ...)")
rows = cursor.fetchall()
# some more processing thereafter
# ...
Basically, this works. But when I try to read the data while it's being load in fetch_data
(by clicking for display in the GUI), the program crashes due to race condition (I guess?): it's accessing the data while it's being fetched at the same time.
I just discovered that tornado.concurrent.futures are not thread-safe:
tornado.concurrent.Future is similar to concurrent.futures.Future, but not thread-safe (and therefore faster for use with single-threaded event loops).
All in all, I think I should create a new thread to take care of the CX_Oracle operations. Can I do that using Tornado and keep using the PerodicCallback
function? How can I convert my asynchronous operation to be thread-safe? What's the way to do this?
PS: Im using Python 2.7
Thanks
Solved it!
@Sraw is right: it should not cause crash.
Explanation: fetch_data()
is using a cx Oracle Connection object (self.db_connection
), which is NOT thread-safe by default. Setting the threaded
parameter to True
wraps the shared connection with a mutex, as described in Cx Oracle documentation:
The threaded parameter is expected to be a boolean expression which indicates whether or not Oracle should wrap accesses to connections with a mutex. Doing so in single threaded applications imposes a performance penalty of about 10-15% which is why the default is False.
So I in my code, I just modified the following and it now works without crashing when the user tries to access data while it's being refreshed:
# inside the connect method of the db_obj class
self.db_connection = cx.connect('connection string', threaded=True) # False by default