I have developed the flask application. I want to send mail from my application. So I choose the Background process using Celery & RabbitMQ I got the below error while pull the data .
celery=Celery(
'sequre_spacement_backend',
broker = app.config['QUEUE_BROKER_URL'],
include = ['sequre_spacement_backend.tasks.MeetingRoom.MeetingRoomSMSNotification']
)
while run the celery worker
Error
[2023-03-07 13:00:56,534: ERROR/MainProcess] Received unregistered task of type 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Thw full contents of the message headers:
{'lang': 'py', 'task': 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test', 'id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'eacb73c8-4ce2-4c7d-9378-da36d221a465', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen137853@heptagon', 'ignore_result': False}
The delivery info for this task is:
{'consumer_tag': 'None4', 'delivery_tag': 2, 'redelivered': True, 'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
File "/opt/lampp/htdocs/sequre_spacement_backend/venv/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 591, in on_task_received
strategy = strategies[type_]
KeyError: 'tasks.MeetingRoom.MeetingRoomSMSNotification.queue_test'
from config import QueueConfig
db = None
app = Flask(__name__, template_folder='../templates')
app.config.from_object(Config())
app.config.from_object(SqlDbConfig())
app.config.from_object(FileSystemConfig())
app.config.from_object(MongoDbConfig())
app.config.from_object(PushNotification())
app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000
CORS(app, support_credentials = True)
bcrypt = Bcrypt(app)
migrate = Migrate(app, db)
db = SQLAlchemy(app, session_options = { 'autoflush': True,
'expire_on_commit': True, 'autocommit': True })
db.init_app(app)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config.from_object(QueueConfig())
celery_prefix = app.config['CELERY_PREFIX']
celery = Celery(
app.name,
broker = app.config['QUEUE_BROKER_URL'],
include=[
'tasks.MeetingRoom.MeetingRoomNotifications'
]
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__( self, *args, **kwargs ):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
It Worked for Me, We should register the task ContextTask and we need to add task function's file in include in Celery Connection