I'm trying to test a queued redis job but the meta
data doesn't seem to be passing between the task and the originator. The job_id's appear to match so I'm a perplexed. Maybe some fresh eyes can help me work out the problem:
The task is as per the documentation:
from rq import get_current_job
def do_test(word):
job = get_current_job()
print job.get_id()
job.meta['word'] = word
job.save()
print "saved: ", job.meta['word']
return True
The rqworker
log prints the job_id and word after it is saved
14:32:32 *** Listening on default...
14:33:07 default: labeller.do_test('supercalafragelistic') (a6e2e579-df26-411a-b017-8788d621149f)
a6e2e579-df26-411a-b017-8788d621149f
saved: supercalafragelistic
14:33:07 Job OK, result = True
14:33:07 Result is kept for 500 seconds.
The task is invoked from a unittest:
class RedisQueueTestCase(unittest.TestCase):
"""
Requires running "rqworker" on the localhost cmdline
"""
def setUp(self):
use_connection()
self.q = Queue()
def test_enqueue(self):
job = self.q.enqueue(do_test, "supercalafragelistic")
while True:
print job.get_id(), job.get_status(), job.meta.get('word')
if job.is_finished:
print "Result: ", job.result, job.meta.get('word')
break
time.sleep(0.25)
And generates this log showing the same job_id and correct result, but the meta
variable word
is never populated.
Testing started at 2:33 PM ...
a6e2e579-df26-411a-b017-8788d621149f queued None
a6e2e579-df26-411a-b017-8788d621149f finished None
Result: True None
Process finished with exit code 0
I tried adding a long delay so the log has a chance to see the task in started
, but not finished state (in case meta is cleared when it finishes), but it didn't make any difference.
Any idea what I've missed?
The local job doesn't automatically update itself after a save
occurs at the remote end. One must do a refresh
to update it. Before the refactoring this was not necessary as I was doing a fetch_job
with the job_id on every request.
So the test routine needs to include a refresh()
(or fetch_job) to reflect any changes:
def test_enqueue(self):
job = self.q.enqueue(do_test, "supercalafragelistic")
while True:
job.refresh() #<--- well, duh, freddy
print job.get_id(), job.get_status(), job.meta.get('word')
if job.is_finished:
print "Result: ", job.result, job.meta.get('word')
break
time.sleep(0.25)
Which works a bit better:
Testing started at 5:14 PM ...
6ea0163f-b5d5-411a-906a-f765aa0b3cc6 queued None 0 []
6ea0163f-b5d5-411a-906a-f765aa0b3cc6 started supercalafragelistic
6ea0163f-b5d5-411a-906a-f765aa0b3cc6 finished supercalafragelistic
Result: True supercalafragelistic
The fact that the get_status
was updating fooled me into overlooking this: get_status()
is a method that goes as looks for the current status, whereas meta
is just a pointer to some possibly stale data somewhere.