Search code examples
pythonunit-testingceleryassert

unittest celery task assertRaises


I have some celery task. I want to test it via unittest.

I'm doing something very similar to:

class TestMe(unittest.TestCase):
    def test_celery_task(self):
        self.assertRaises(ValueError, celery_task.apply, args)

what is strange for me:

this assert fails, because ValueError not raised, but during executing process I can see ValueError as a result of this celery task.

I'm not sure, but it looks like assert is checking faster than ValueError is rising. Is it possible to check the result of executed celery task? or how it may be tested?


Solution

  • That can't possibly work. When you enqueue a Celery task, all that happens is that you put a message into the queue for a separate process to pick up; it is that process that runs the task and, potentially, raises the exception.

    If you want to check that the task itself raises ValueError, then you should call the task, not the delay function:

    self.assertRaises(ValueError, celery_task, args)