Search code examples
pythonredispython-rq

Storing "meta" data on redis job is not working?


I'm trying to test a queued redis job but the meta data doesn't seem to be passing between the task and the originator. The job_id's appear to match so I'm a perplexed. Maybe some fresh eyes can help me work out the problem:

The task is as per the documentation:

from rq import get_current_job

def do_test(word):
    job = get_current_job()
    print job.get_id()
    job.meta['word'] = word
    job.save()
    print "saved: ", job.meta['word']
    return True

The rqworker log prints the job_id and word after it is saved

14:32:32 *** Listening on default...
14:33:07 default: labeller.do_test('supercalafragelistic') (a6e2e579-df26-411a-b017-8788d621149f)
a6e2e579-df26-411a-b017-8788d621149f
saved:  supercalafragelistic
14:33:07 Job OK, result = True
14:33:07 Result is kept for 500 seconds.

The task is invoked from a unittest:

class RedisQueueTestCase(unittest.TestCase):
    """
    Requires running "rqworker" on the localhost cmdline
    """
    def setUp(self):
        use_connection()
        self.q = Queue()

    def test_enqueue(self):
        job = self.q.enqueue(do_test, "supercalafragelistic")
        while True:
            print job.get_id(), job.get_status(), job.meta.get('word')
            if job.is_finished:
                print "Result: ", job.result, job.meta.get('word')
                break
            time.sleep(0.25)

And generates this log showing the same job_id and correct result, but the meta variable word is never populated.

Testing started at 2:33 PM ...
a6e2e579-df26-411a-b017-8788d621149f queued None
a6e2e579-df26-411a-b017-8788d621149f finished None
Result:  True None

Process finished with exit code 0

I tried adding a long delay so the log has a chance to see the task in started, but not finished state (in case meta is cleared when it finishes), but it didn't make any difference.

Any idea what I've missed?


Solution

  • The local job doesn't automatically update itself after a save occurs at the remote end. One must do a refresh to update it. Before the refactoring this was not necessary as I was doing a fetch_job with the job_id on every request.

    So the test routine needs to include a refresh() (or fetch_job) to reflect any changes:

    def test_enqueue(self):
        job = self.q.enqueue(do_test, "supercalafragelistic")
        while True:
            job.refresh()     #<--- well, duh, freddy
            print job.get_id(), job.get_status(), job.meta.get('word')
            if job.is_finished:
                print "Result: ", job.result, job.meta.get('word')
                break
            time.sleep(0.25)
    

    Which works a bit better:

    Testing started at 5:14 PM ...
    6ea0163f-b5d5-411a-906a-f765aa0b3cc6 queued None 0 []
    6ea0163f-b5d5-411a-906a-f765aa0b3cc6 started supercalafragelistic
    6ea0163f-b5d5-411a-906a-f765aa0b3cc6 finished supercalafragelistic
    Result:  True supercalafragelistic
    

    The fact that the get_status was updating fooled me into overlooking this: get_status() is a method that goes as looks for the current status, whereas meta is just a pointer to some possibly stale data somewhere.