I'm having problems running multithreaded tasks using python RQ (tested on v0.5.6 and v0.6.0).
Consider the following piece of code, as a simplified version of what I'm trying to achieve:
from threading import Thread
class MyThing(object):
def say_hello(self):
while True:
print "Hello World"
def hello_task(self):
t = Thread(target=self.say_hello)
t.daemon = True # seems like it makes no difference
t.start()
t.join()
from rq import Queue
from redis import Redis
from thing import MyThing
conn = Redis()
q = Queue(connection=conn)
q.enqueue(MyThing().say_hello, timeout=5)
When executing main.py
(while rqworker is running in background), the job breaks as expected by timeout, within 5 seconds.
Problem is, when I'm setting a task containing thread/s such as MyThing().hello_task
, the thread runs forever and nothing happens when the 5 seconds timeout is over.
How can I run a multithreaded task with RQ, such that the timeout will kill the task, its sons, grandsons and their wives?
When you run t.join()
, the hello_task
thread blocks and waits until the say_hello
thread returns - thus not receiving the timeout signal from rq. You can allow the main thread to run and properly receive the timeout signal by using Thread.join
with a set amount of time to wait, while waiting for the thread to finish running. Like so:
def hello_task(self):
t = Thread(target=self.say_hello)
t.start()
while t.isAlive():
t.join(1) # Block for 1 second
That way you could also catch the timeout exception and handle it, if you wish:
def hello_task(self):
t = Thread(target=self.say_hello)
t.start()
try:
while t.isAlive():
t.join(1) # Block for 1 second
except JobTimeoutException: # From rq.timeouts.JobTimeoutException
print "Thread killed due to timeout"
raise