Search code examples
pythoncelerydjango-celeryworker

celery worker only work once


full steps:

  1. start django

  2. start a celery worker

    python manage.py celery worker --app=celery_worker:app -Ofair -n W1

  3. upload a url list file, loop url list send each url to a task fetch_article

  4. worker works

  5. upload another url list file

  6. worker no actions

views.py:

@csrf_exempt
def upload(request):

    job_name = request.POST.get('job_name')
    if not job_name:
        return JsonResponse(JsonStatus.Error)

    if len(request.FILES) == 1:
        yq_data = request.FILES.values()[0]
    else:
        return JsonResponse(JsonStatus.Error)

    job = Job.objects.create(name=job_name)

    reader = csv.reader(yq_data, delimiter=',')

    task_count = 0

    next(reader)
    for row in reader:
        url = row[0].strip()
        fetch_article.delay(job.id, url)
        # fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
        task_count += 1


    # print 'qn%s' % job.queue_name
    # rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
    # print rp

    job.task_count = task_count
    job.save()

    return JsonResponse(JsonStatus.OK, msg=task_count)

tasks.py

@shared_task()
def fetch_article(job_id, url):

    logger.info(u'fetch_article:%s' % url)

    Processer = get_processor_cls(url)

    a = Article(job_id=job_id, url=url)
    try:
        ap = Processer(url)
        title, text = ap.process()
        a.title = title
        a.content = text

    except Exception as e:
        a.status = 2
        a.error = e
        logger.error(u'fetch_article:%s error:%s' % (url, e))

    a.save()

Solution

  • OK, I found the problem.

    Because I set CELERY_ALWAYS_EAGER = True in settings. The task run in django main process, so worker no actions

    from doc:

    CELERY_ALWAYS_EAGER If this is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, which emulates the API and behavior of AsyncResult, except the result is already evaluated.

    That is, tasks will be executed locally instead of being sent to the queue.

    For the worker work first time, I am still confuse, may be there are some urls in previous job's queue.