Search code examples
rabbitmqdjango-celery

Cant get Celery/RabbitMQ to run my shared task in Django


I have an application that is already deployed and working in production however this was all done by someone else. I am now attempting to make a local version of the environment and I can't seem to get my local Celery/RabbitMQ to actually run the task.

The application is very large, so i won't attempt to post it all here but i have a few clues from my debugging that may be useful. One is this. When i run the following functions:

task_id = celery_send_playbook_msg_util.apply_async([brand_user.id, pb['id'], sequence_id, '', False, False, message_id,
                                                   pb['playbook'], event_type == constants.event_types['Abandoned']],
                                                  eta=delivery_datetime, queue='high_priority', priority=8)

print("Celery Task ID: " + str(task_id))

I actually do get a UUID style task_id in return. This indicates to me that the Celery Broker is running. Also I have tried the following configuration options for celery broker (so far none have worked)

#BROKER_URL = 'amqp://test:[email protected]:5672//'
#BROKER_URL = 'amqp://test:test@localhost:5672//'
#BROKER_URL = 'amqp://test:test@localhost//'
BROKER_URL = 'amqp://test:[email protected]//'

Other clues:

It occurs to me that it might be helpful to see the output of the command i used to initiate the workers so here it is:

celery -A Python worker --loglevel=debug

 -------------- celery@vagrant v4.2.1 (windowlicker)
---- **** -----
--- * ***  * -- Linux-4.15.0-29-generic-x86_64-with-Ubuntu-18.04-bionic 2019-08-02 20:42:29
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         Python:0x7fa1367f0650
- ** ---------- .> transport:   amqp://test:**@192.168.33.10:5672//
- ** ---------- .> results:     rpc://
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . Python.celery.debug_task
  . Python.celery.send_messages_daily_unreadcount
  . Sensus.tasks.bulk_manual_optin_from_csv_task
  . Sensus.tasks.celery_csv_upload_send_message
  . Sensus.tasks.celery_send_messages_daily_util
  . Sensus.tasks.celery_send_msg_util
  . Sensus.tasks.celery_send_payment_message
  . Sensus.tasks.celery_send_playbook_msg_util
  . Sensus.tasks.consolidate_messages_and_analyze_sentiment
  . Sensus.tasks.scheduled_broadcast_task
  . celery.accumulate
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

[2019-08-02 20:42:29,590: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': '[email protected]', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,592: INFO/MainProcess] Connected to amqp://test:**@192.168.33.10:5672//
[2019-08-02 20:42:29,601: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': '[email protected]', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,603: INFO/MainProcess] mingle: searching for neighbors
[2019-08-02 20:42:29,604: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:29,606: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:29,621: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'product': 'RabbitMQ', 'copyright': 'Copyright (C) 2007-2017 Pivotal Software, Inc.', 'capabilities': {'exchange_exchange_bindings': True, 'connection.blocked': True, 'authentication_failure_close': True, 'direct_reply_to': True, 'basic.nack': True, 'per_consumer_qos': True, 'consumer_priorities': True, 'consumer_cancel_notify': True, 'publisher_confirms': True}, 'cluster_name': '[email protected]', 'platform': 'Erlang/OTP', 'version': '3.6.10'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: [u'en_US']
[2019-08-02 20:42:29,623: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:29,624: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,630: INFO/MainProcess] mingle: all alone
[2019-08-02 20:42:30,636: DEBUG/MainProcess] using channel_id: 2
[2019-08-02 20:42:30,637: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,641: DEBUG/MainProcess] using channel_id: 3
[2019-08-02 20:42:30,642: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,645: DEBUG/MainProcess] using channel_id: 1
[2019-08-02 20:42:30,646: DEBUG/MainProcess] Channel open
[2019-08-02 20:42:30,649: WARNING/MainProcess] /home/vagrant/.local/lib/python2.7/site-packages/celery/fixups/django.py:200: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2019-08-02 20:42:30,650: INFO/MainProcess] celery@vagrant ready.
[2019-08-02 20:42:30,651: DEBUG/MainProcess] basic.qos: prefetch_count->4
[2019-08-02 20:42:50,649: DEBUG/MainProcess] heartbeat_tick : for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:42:50,651: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: None/None, now - 28/58, monotonic - 11221.8028604, last_heartbeat_sent - 11221.8028469, heartbeat int. - 60 for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:43:10,655: DEBUG/MainProcess] heartbeat_tick : for connection bcbf34af62f3488c8bbcee3f18b42621
[2019-08-02 20:43:10,656: DEBUG/MainProcess] heartbeat_tick : Prev sent/recv: 28/58, now - 28/88, monotonic - 11241.8086932, last_heartbeat_sent - 11221.8028469, heartbeat int. - 60 for connection bcbf34af62f3488c8bbcee3f18b42621

Solution

  • So i figured it out. Here is how.

    First i used the following command to see what was in the Queue:

    sudo rabbitmqctl list_queues
    

    this gave me the following output:

    Listing queues
    d68c3a7d-ed35-3c79-b571-0d01ccda84ad    1
    2753309c-9f03-399c-871d-5b4ffcbea462    0
    high_priority   23
    8ce8d7e0-0081-3937-80fb-ff238be8f410    1
    4ce2ecce-6954-3c07-857a-4221fe613e72    0
    celery  0
    [email protected]    0
    celeryev.1a7429e0-48b2-4ead-925c-42ee1855247d   0
    8127f8e8-073c-3972-a563-829ab207b964    0
    

    I was curious what the 23 was next to 'high_priority' and i noticed that it kept going up every time i tried something that should have been put in the queue. As it turns out, in my application when we put something into the queue we don't just put it into the general queue, we put it into one that we have named 'high_priority'. Because i did not notice this i was starting my worker to look at the general queue. to solve this problem i added a -Q option to the worker call like so:

    celery -A Python worker --loglevel=debug -Q high_priority
    

    And this solved the problem