Search code examples
djangocelerycelerydcelerybeatdjcelery

Django Celerybeat PeriodicTask running far more than expected


I'm struggling with Django, Celery, djcelery & PeriodicTasks.

I've created a task to pull a report for Adsense to generate a live stat report. Here is my task:

import datetime
import httplib2
import logging

from apiclient.discovery import build
from celery.task import PeriodicTask
from django.contrib.auth.models import User
from oauth2client.django_orm import Storage

from .models import Credential, Revenue


logger = logging.getLogger(__name__)


class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)

    def run(self, *args, **kwargs):
        scraper = Scraper()
        scraper.get_report()


class Scraper(object):
    TODAY = datetime.date.today()
    YESTERDAY = TODAY - datetime.timedelta(days=1)

    def get_report(self, start_date=YESTERDAY, end_date=TODAY):
        logger.info('Scraping Adsense report from {0} to {1}.'.format(
            start_date, end_date))
        user = User.objects.get(pk=1)
        storage = Storage(Credential, 'id', user, 'credential')
        credential = storage.get()
        if not credential is None and credential.invalid is False:
            http = httplib2.Http()
            http = credential.authorize(http)
            service = build('adsense', 'v1.2', http=http)
            reports = service.reports()
            report = reports.generate(
                startDate=start_date.strftime('%Y-%m-%d'),
                endDate=end_date.strftime('%Y-%m-%d'),
                dimension='DATE',
                metric='EARNINGS',
            )
            data = report.execute()
            for row in data['rows']:
                date = row[0]
                revenue = row[1]

                try:
                    record = Revenue.objects.get(date=date)
                except Revenue.DoesNotExist:
                    record = Revenue()
                record.date = date
                record.revenue = revenue
                record.save()
        else:
            logger.error('Invalid Adsense Credentials')

I'm using Celery & RabbitMQ. Here are my settings:

# Celery/RabbitMQ
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "****"
BROKER_VHOST = "myvhost"
CELERYD_CONCURRENCY = 1
CELERYD_NODES = "w1"
CELERY_RESULT_BACKEND = "amqp"
CELERY_TIMEZONE = 'America/Denver'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

import djcelery
djcelery.setup_loader()

On first glance everything seems to work, but after turning on the logger and watching it run I have found that it is running the task at least four times in a row - sometimes more. It also seems to be running every minute instead of every two minutes. I've tried changing the run_every to use a crontab but I get the same results.

I'm starting celerybeat using supervisor. Here is the command I use:

python manage.py celeryd -B -E -c 1

Any ideas as to why its not working as expected?

Oh, and one more thing, after the day changes, it continues to use the date range it first ran with. So as days progress it continues to get stats for the day the task started running - unless I run the task manually at some point then it changes to the date I last ran it manually. Can someone tell me why this happens?


Solution

  • Consider creating a separate queue with one worker process and fixed rate for this type of tasks and just add the tasks in this new queue instead of running them in directly from celerybeat. I hope that could help you to figure out what is wrong with your code, is it problem with celerybeat or your tasks are running longer than expected.

    @task(queue='create_report', rate_limit='0.5/m')
    def create_report():
        scraper = Scraper()
        scraper.get_report()
    
    class GetReportTask(PeriodicTask):
        run_every = datetime.timedelta(minutes=2)
    
        def run(self, *args, **kwargs):
            create_report.delay()
    

    in settings.py

       CELERY_ROUTES = {
         'myapp.tasks.create_report': {'queue': 'create_report'},
       }
    

    start additional celery worker with that would handle tasks in your queue

    celery worker -c 1 -Q create_report -n create_report.local

    Problem 2. Your YESTERDAY and TODAY variables are set at class level, so within one thread they are set only once.