Search code examples
djangotimezonecelerydjango-celerycelerybeat

Celery beat: Change to individual timezone tasks causing validation error "Invalid timezone"


celery --version 5.1.2 (sun-harmonics)

django --version 3.2.8

I have a celery schedule that has four tasks that run in different timezones. I am using nowfun for setting the timezones and have set CELERY_ENABLE_UTC = False in settings.py. I followed the top response on this SO post: Celery beat - different time zone per task

Note that I made this change this morning - I was running a previous version of the code without these settings.

Currently, I am saving the celery results to CELERY_RESULT_BACKEND = 'django-db'.

Since implementing the change that allows for different tasks to be run according to different timezones I am getting an error when I run celery -A backend beat -l info.

It's super long though here is the head and tail: Head:

[2021-10-29 07:29:36,059: INFO/MainProcess] beat: Starting... [2021-10-29 07:29:36,067: ERROR/MainProcess] Cannot add entry 'celery.backend_cleanup' to database schedule: ValidationError(["Invalid timezone '<LocalTimezone: UTC+00>'"]). Contents: {'task': 'celery.backend_cleanup', 'schedule': <crontab: 0 4

      • (m/h/d/dM/MY)>, 'options': {'expire_seconds': 43200}}

Tail:

django.core.exceptions.ValidationError: ["Invalid timezone '<LocalTimezone: UTC+00>'"]

Celery beat hangs on this last error message and I have to kill it with ctrl + c.

I went onto celery and read their instructions about manually resetting the database when timezone-related settings change - the website says:

$ python manage.py shell

from django_celery_beat.models import

PeriodicTask PeriodicTask.objects.update(last_run_at=None)

I then found some documentation that said:

Warning: If you change the Django TIME_ZONE setting your periodic task schedule will still be based on the old timezone. To fix that you would have to reset the “last run time” for each periodic task:

from django_celery_beat.models import PeriodicTask, PeriodicTasks

PeriodicTask.objects.all().update(last_run_at=None)

PeriodicTasks.changed()

Note that this will reset the state as if the periodic tasks have never run before.

So I think what's causing the problem is exactly what it says above - I changed timezones and the schedule is still running on the old UTC timezone so I need to update it, though my schedules have run before and so when I type:

>>> PeriodicTask.objects.all().update(last_run_at=None)

I get the response:

13

and then when I enter:

>>> PeriodicTasks.changed()

I get a type error:

TypeError: changed() missing 1 required positional argument: 'instance'

So my question is:

What do I do to update the PeriodTask and PeriodicTasks? What arguments should I pass to PeriodicTasks.changed() and is 13 the expected response for the first command?

Here is my celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
import pytz
from datetime import datetime

os.environ.setdefault(
    'DJANGO_SETTINGS_MODULE',
    'backend.settings'
)

app = Celery(
    'backend'
)

app.config_from_object(
    settings,
    namespace='CELERY'
)

def uk_time():
     return datetime.now(pytz.timezone('Europe/London'))

def us_time():
    return datetime.now(pytz.timezone('EST'))

def jp_time():
    return datetime.now(pytz.timezone('Japan'))

# Celery Beat Settings
app.conf.beat_schedule={
    'generate_signals_london': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=8,
            nowfun=uk_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('UK',),
    },

    'generate_signals_ny': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=7,
            nowfun=us_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('NY',),
    },

    'generate_signals_nyse': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=9,
            nowfun=us_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('NYSE',),
    },

    'generate_signals_asia': {
        'task': 'signals.tasks.generate_signals',
        'schedule': crontab(
            minute=0,
            hour=8,
            nowfun=jp_time,
            day_of_week='1,2,3,4,5'
        ),
        'args': ('JP',),
    },

}

app.autodiscover_tasks()

Solution

  • When trying to create a schedule where tasks have different timezones and they depend on dst it is important to make this dynamic.

    Create a task that updates the beat schedule database object

    import os
    from django import setup
    from celery import Celery
    from celery.schedules import crontab
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.settings')
    setup()
    app = Celery('api')
    app.conf.timezone = 'UTC'
    
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.conf.broker_connection_retry_on_startup = True
    
    # Register database scheduler for beat
    app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
    
    # Register our `update_beat_schedule` task to run every Sunday at 20:00 UTC
    app.conf.beat_schedule = {
        'update_beat_schedule': {
            'task': 'utility_app.tasks.update_beat_schedule',
            'schedule': crontab(hour=20, minute=0, day_of_week='sun'),
            'args': ()
        },
    }
    
    app.autodiscover_tasks()
    

    Then have the task create the schedule with everything it needs and update the PeriodicTask model. The reason I filtered it first is so that I can update instances that already exist - otherwise new instances are created instead

    from django_celery_beat.models import PeriodicTask, CrontabSchedule
    from celery import shared_task
    import json
    from pytz import timezone
    from datetime import datetime
    from utility_app.utils import first_business_days
    
    class UtilsAppError(Exception):
        def __init__(self, message):
            self.message = message
            super().__init__(f"{message}")
    
    def get_mt4_timezone():
        eastern = timezone('US/Eastern')
        is_dst = bool(eastern.localize(datetime.now()).dst())
        mt4_tz = 'Etc/GMT-3' if is_dst else 'Etc/GMT-2'
        return mt4_tz
    
    def get_year_month_day():
        tz = timezone(get_mt4_timezone())
        current_mt4_datetime = datetime.now(tz)
        current_month = current_mt4_datetime.month
        current_year = current_mt4_datetime.year
        current_day = current_mt4_datetime.day
        return current_year, current_month, current_day
    
    def get_day_of_month_or_week(period='month'):
        year, month, day = get_year_month_day()
        first_business_day_next_month, first_business_day_following_week = first_business_days(year, month, day)
        day_of_month = first_business_day_next_month.day
        day_of_week = first_business_day_following_week.weekday()
        return day_of_month if period == 'month' else day_of_week
    
    
    @shared_task
    def update_beat_schedule():
        try:
            mt4_timezone = get_mt4_timezone()
            day_of_month = get_day_of_month_or_week('month')
            day_of_week = get_day_of_month_or_week('week')
    
            tasks_to_update = [
                {
                    'name': 'monthly_analysis', 
                    'task': 'signals_app.tasks.technical_analysis', 
                    'hour': 0, 
                    'timezone':mt4_timezone, 
                    'day_of_month': day_of_month, 
                    'args': (mt4_timezone,)
                },
                {
                    'name': 'weekly_analysis', 
                    'task': 'signals_app.tasks.technical_analysis', 
                    'hour': 0, 
                    'timezone':mt4_timezone, 
                    'day_of_week': day_of_week, 
                    'args': (mt4_timezone,)
                },
                {
                    'name': 'tokyo_bias', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 0, 
                    'timezone':mt4_timezone, 
                    'args': ('Tokyo', 'market_open_bias', mt4_timezone)
                },
                {
                    'name': 'london_bias', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 8, 
                    'timezone':mt4_timezone, 
                    'args': ('London', 'market_open_bias', mt4_timezone)
                },
                {
                    'name': 'ny_bias', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 12, 
                    'timezone':mt4_timezone, 
                    'args': ('NewYork', 'market_open_bias', mt4_timezone)
                },
                {
                    'name': 'nyse_bias', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 16, 
                    'timezone':mt4_timezone, 
                    'args': ('NYSE', 'market_open_bias', mt4_timezone)
                },
                {
                    'name': 'tokyo_market_open', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 9, 
                    'timezone':'Asia/Tokyo', 
                    'args': ('Tokyo', 'market_open', mt4_timezone)
                },
                {
                    'name': 'london_market_open', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 8,
                    'timezone':'Europe/London', 
                    'args': ('London', 'market_open', mt4_timezone)
                },
                {
                    'name': 'ny_market_open', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 8, 
                    'timezone':'US/Eastern', 
                    'args': ('NewYork', 'market_open', mt4_timezone)
                },
                {
                    'name': 'nyse_market_open', 
                    'task': 'signals_app.tasks.process_signal_tasks', 
                    'hour': 10, 
                    'timezone':'US/Eastern', 
                    'args': ('NYSE', 'market_open', mt4_timezone)
                }
            ]
    
            for task in tasks_to_update:
                # First, try to find the PeriodicTask by name.
                periodic_task = PeriodicTask.objects.filter(name=task['name']).first()
    
                if periodic_task:
                    # If it exists, update its CrontabSchedule
                    crontab = periodic_task.crontab
                    crontab.hour = task['hour']
                    crontab.minute = 0
                    crontab.day_of_month = task.get('day_of_month', '*')
                    crontab.day_of_week = task.get('day_of_week', '*')
                    crontab.timezone = task['timezone']
                    crontab.save()
                else:
                    # If it doesn't exist, create a new CrontabSchedule and PeriodicTask
                    crontab, _ = CrontabSchedule.objects.get_or_create(
                        hour=task['hour'],
                        minute=0,
                        day_of_month=task.get('day_of_month', '*'),
                        day_of_week=task.get('day_of_week', '*'),
                        timezone=task['timezone']
                    )
                    PeriodicTask.objects.create(
                        name=task['name'],
                        crontab=crontab,
                        args=json.dumps(task.get('args', []))
                    )
    
        except Exception as e:
            raise UtilsAppError(f"Error updating beat schedule for task {task['name']}: {e}")