I need some help regarding Celery workers. I'm especially not able to understand where (which directory) does the celery worker command needs to be fired from and what is the concept behind it and some things around imports.
So say I have the following directory structure :
.
├── __init__.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
.
at the root is the current working directory, named project
each of the taskn.py (task1.py, task2.py etc) are the individual tasks. Each task file looks something like this:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
The below is the content of entry.py
:
import json
from flask_cors import CORS
from flask import Flask, Response, render_template
from flask import request, jsonify, redirect
from functools import wraps
<what would be the import statement to import all the tasks>
_name_ = "project_x"
app = Flask(_name_)
@app.route("/api1", methods=['POST'])
def api1():
req = request.jsonify
if not req:
return jsonify(success=False, msg="Missing request parameters", code="1")
else:
param1 = req.get('p1')
param2 = req.get('p2')
tId = startTask()
return jsonify(success="True", msg="All Good", taskId=tId)
def startTask():
tId = "abcd123"
created_task = state1.family1.task1.subtask(queue='q1')
created_task.delay()
return tId
if __name__ == '__main__':
app.run(debug=True, host="192.168.1.7", port="4444")
the entry.py is the flask app from where api1 would be triggered and then depending on the parameters I would want to start a specific task.
Now here are my questions:
entry.py
file Celery -A <directory name> worker -l info
command and why ?So taking the advice from @Patricio, it seemed that it was indeed an import error. My new directory structure looks like the below:
.
├── __init__.py
├── celeryConfig
│ ├── __init__.py
│ └── celeryApp.py
├── entry.py
├── state1
│ ├── __init__.py
│ ├── family1
│ │ ├── __init__.py
│ │ ├── task1.py
│ │ ├── task2.py
│ │ └── task3.py
│ └── family2
│ ├── __init__.py
│ └── task1.py
└── state2
├── __init__.py
├── family1
│ ├── __init__.py
│ ├── task1.py
│ └── task2.py
└── family2
├── __init__.py
├── task1.py
└── task2.py
while contents of the celeryConfig/celeryApp.py
are the below:
from celery import Celery
from celery.result import AsyncResult
from kombu import Queue
_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost', include=['state1.family1.task1'])
CELERY_CONFIG = {
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
'CELERY_TASK_SERIALIZER': 'pickle',
'CELERY_ACCEPT_CONTENT': ['json','pickle']
}
celapp.conf.update(**CELERY_CONFIG)
and the contents of taskn.py is something like:
from celeryConfig.celeryApp import celapp
import time
@celapp.task()
def t1():
print("starting task")
time.sleep(5)
print("Finished task")
while entry.py
remains as it is, with just one change as below:
from state1.family1.task1 import t1
And now when celery is started as :
celery -A celeryConfig.celeryApp worker -l info
from the root directory, project
, it all runs fine. As output of the above command I get the message as
.
.
.
[tasks]
. state1.family1.task1.t1
.
.
.
indicating that the celery has started correctly and that the task has indeed been registered. So now, in order to get all the tasks registered I can read through the directory/directories and create the include
list in celeryApp.py
dynamically. (Will post more about it once done)
Thanks @Patricio