full steps:
start django
start a celery worker
python manage.py celery worker --app=celery_worker:app -Ofair -n W1
upload a url list file, loop url list send each url to a task fetch_article
worker works
upload another url list file
worker no actions
views.py:
@csrf_exempt
def upload(request):
job_name = request.POST.get('job_name')
if not job_name:
return JsonResponse(JsonStatus.Error)
if len(request.FILES) == 1:
yq_data = request.FILES.values()[0]
else:
return JsonResponse(JsonStatus.Error)
job = Job.objects.create(name=job_name)
reader = csv.reader(yq_data, delimiter=',')
task_count = 0
next(reader)
for row in reader:
url = row[0].strip()
fetch_article.delay(job.id, url)
# fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
task_count += 1
# print 'qn%s' % job.queue_name
# rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
# print rp
job.task_count = task_count
job.save()
return JsonResponse(JsonStatus.OK, msg=task_count)
tasks.py
@shared_task()
def fetch_article(job_id, url):
logger.info(u'fetch_article:%s' % url)
Processer = get_processor_cls(url)
a = Article(job_id=job_id, url=url)
try:
ap = Processer(url)
title, text = ap.process()
a.title = title
a.content = text
except Exception as e:
a.status = 2
a.error = e
logger.error(u'fetch_article:%s error:%s' % (url, e))
a.save()
OK, I found the problem.
Because I set CELERY_ALWAYS_EAGER = True
in settings.
The task run in django main process, so worker no actions
from doc:
CELERY_ALWAYS_EAGER If this is True, all tasks will be executed locally by blocking until the task returns. apply_async() and Task.delay() will return an EagerResult instance, which emulates the API and behavior of AsyncResult, except the result is already evaluated.
That is, tasks will be executed locally instead of being sent to the queue.
For the worker work first time, I am still confuse, may be there are some urls in previous job's queue.