Search code examples
pythonmultithreadingpython-2.7python-rqdjango-rq

RQ Timeout does not kill multi-threaded jobs


I'm having problems running multithreaded tasks using python RQ (tested on v0.5.6 and v0.6.0).

Consider the following piece of code, as a simplified version of what I'm trying to achieve:

thing.py

from threading import Thread

class MyThing(object):
    def say_hello(self):
        while True:
            print "Hello World"

    def hello_task(self):
        t = Thread(target=self.say_hello)
        t.daemon = True # seems like it makes no difference
        t.start()
        t.join()

main.py

from rq import Queue
from redis import Redis
from thing import MyThing

conn = Redis()

q = Queue(connection=conn)

q.enqueue(MyThing().say_hello, timeout=5)

When executing main.py (while rqworker is running in background), the job breaks as expected by timeout, within 5 seconds.

Problem is, when I'm setting a task containing thread/s such as MyThing().hello_task, the thread runs forever and nothing happens when the 5 seconds timeout is over.

How can I run a multithreaded task with RQ, such that the timeout will kill the task, its sons, grandsons and their wives?


Solution

  • When you run t.join(), the hello_task thread blocks and waits until the say_hello thread returns - thus not receiving the timeout signal from rq. You can allow the main thread to run and properly receive the timeout signal by using Thread.join with a set amount of time to wait, while waiting for the thread to finish running. Like so:

    def hello_task(self):
        t = Thread(target=self.say_hello)
        t.start()
        while t.isAlive():
            t.join(1)  # Block for 1 second
    

    That way you could also catch the timeout exception and handle it, if you wish:

    def hello_task(self):
        t = Thread(target=self.say_hello)
        t.start()
        try:
            while t.isAlive():
                t.join(1)  # Block for 1 second
        except JobTimeoutException:  # From rq.timeouts.JobTimeoutException
            print "Thread killed due to timeout"
            raise