Search code examples
pythoncelerycelery-taskceleryd

Celery worker which directory to start from


I need some help regarding Celery workers. I'm especially not able to understand where (which directory) does the celery worker command needs to be fired from and what is the concept behind it and some things around imports.

So say I have the following directory structure :

.
├── __init__.py
├── entry.py
├── state1
│   ├── __init__.py
│   ├── family1
│   │   ├── __init__.py
│   │   ├── task1.py
│   │   ├── task2.py
│   │   └── task3.py
│   └── family2
│       ├── __init__.py
│       └── task1.py
└── state2
    ├── __init__.py
    ├── family1
    │   ├── __init__.py
    │   ├── task1.py
    │   └── task2.py
    └── family2
        ├── __init__.py
        ├── task1.py
        └── task2.py

. at the root is the current working directory, named project

each of the taskn.py (task1.py, task2.py etc) are the individual tasks. Each task file looks something like this:

from celery import Celery
from celery.result import AsyncResult
from kombu import Queue

_name_ = "project_x"
celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost')
CELERY_CONFIG = {
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
    'CELERY_TASK_SERIALIZER': 'pickle',
    'CELERY_ACCEPT_CONTENT': ['json','pickle']
}

celapp.conf.update(**CELERY_CONFIG)

@celapp.task()
def t1():
    print("starting task")
    time.sleep(5)
    print("Finished task")

The below is the content of entry.py:

import json
from flask_cors import CORS
from flask import Flask, Response, render_template
from flask import request, jsonify, redirect
from functools import wraps
<what would be the import statement to import all the tasks>

_name_ = "project_x"
app     = Flask(_name_)

@app.route("/api1", methods=['POST'])
def api1():
    req = request.jsonify
    if not req:
        return jsonify(success=False, msg="Missing request parameters", code="1")
    else:
        param1 = req.get('p1')
        param2 = req.get('p2')
        tId = startTask()
        return jsonify(success="True", msg="All Good", taskId=tId)


def startTask():
    tId = "abcd123"
    created_task = state1.family1.task1.subtask(queue='q1')
    created_task.delay()
    return tId


if __name__ == '__main__':
    app.run(debug=True, host="192.168.1.7", port="4444")

the entry.py is the flask app from where api1 would be triggered and then depending on the parameters I would want to start a specific task.

Now here are my questions:

  1. what would be the import statement to import all the tasks in the entry.py file
  2. where from do I start the worker. I mean from which directory should I start the Celery -A <directory name> worker -l info command and why ?
  3. In many examples I saw that there is a clear segregation between tasks and CeleryApp file. Could someone please suggest what would be a better way to arrange my tasks, and celery configs etc. and how would the above 2 questions align with this new proposed structure ?

Solution

  • So taking the advice from @Patricio, it seemed that it was indeed an import error. My new directory structure looks like the below:

    .
    ├── __init__.py
    ├── celeryConfig
    │   ├── __init__.py
    │   └── celeryApp.py
    ├── entry.py
    ├── state1
    │   ├── __init__.py
    │   ├── family1
    │   │   ├── __init__.py
    │   │   ├── task1.py
    │   │   ├── task2.py
    │   │   └── task3.py
    │   └── family2
    │       ├── __init__.py
    │       └── task1.py
    └── state2
        ├── __init__.py
        ├── family1
        │   ├── __init__.py
        │   ├── task1.py
        │   └── task2.py
        └── family2
            ├── __init__.py
            ├── task1.py
            └── task2.py
    

    while contents of the celeryConfig/celeryApp.py are the below:

    from celery import Celery
    from celery.result import AsyncResult
    from kombu import Queue
    
    _name_ = "project_x"
    celapp=Celery(backend='redis://localhost:6379/0', broker='amqp://a:b@localhost/a_vhost', include=['state1.family1.task1'])
    CELERY_CONFIG = {
        'CELERY_DEFAULT_QUEUE': 'default',
        'CELERY_QUEUES': (Queue('q1'), Queue('q2'),),
        'CELERY_TASK_SERIALIZER': 'pickle',
        'CELERY_ACCEPT_CONTENT': ['json','pickle']
    }
    
    celapp.conf.update(**CELERY_CONFIG)
    

    and the contents of taskn.py is something like:

    from celeryConfig.celeryApp import celapp
    import time
    
    @celapp.task()
    def t1():
        print("starting task")
        time.sleep(5)
        print("Finished task")
    

    while entry.py remains as it is, with just one change as below:

    from state1.family1.task1 import t1

    And now when celery is started as : celery -A celeryConfig.celeryApp worker -l info from the root directory, project, it all runs fine. As output of the above command I get the message as

    .
    .
    .
    [tasks]
      . state1.family1.task1.t1
    
    .
    .
    .
    

    indicating that the celery has started correctly and that the task has indeed been registered. So now, in order to get all the tasks registered I can read through the directory/directories and create the include list in celeryApp.py dynamically. (Will post more about it once done)

    Thanks @Patricio