I have three Celery tasks:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata(): # run synchronously
notify_report_was_not_updated.delay()
@celery_app.task
def load_rawdata():
# load and process file from FTP
return False # some error happened
@celery_app.task
def notify_rawdata_was_not_updated():
pass # send email by Django
I need to test that email was sent if load_rawdata
task (function) returns False
. For that I have written some test which does not work:
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=False)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
load_rawdata_on_monday()
assert len(mail.outbox) == 1, "Inbox is not empty"
assert mail.outbox[0].subject == 'Subject here'
assert mail.outbox[0].body == 'Here is the message.'
assert mail.outbox[0].from_email == 'from@example.com'
assert mail.outbox[0].to == ['to@example.com']
It seems notify_rawdata_was_not_updated
still being run asynchronously.
How to write proper test?
I should'v use CELERY_TASK_ALWAYS_EAGER
instead of CELERY_ALWAYS_EAGER