Search code examples
pythonsocket.iorabbitmqcelerysession-timeout

Why does celery retry, but my job did not fail?


I have a celery job to run MySQL databases, however, it always got Lock Wait Timeout. After digging into the databases queries, I realized that celery triggered another job after 1800 sec, and got my databases issue. I don't know why – my job did not fail yet!

@celery.task(bind=True, acks_late=True)
def etl_pipeline(dev=dev, test=test):

I can tell that MySQL got the same query again, it could be that Celery triggers the same job. Why here I got retry, and the default retry is 180 sec (3 min).

Here is official doc:

default_retry_delay = 180

Default time in seconds before a retry of the task should be executed. 3 minutes by default.

But my case is 1800 sec.

Also, my broker got some other warning, I'm not sure if this is related:

The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.

Config RabbitMq

RABBITMQ_SERVER = 'amqp://{}:{}@{}'.format(
    os.getenv('RABBITMQ_USER'),
    os.getenv('RABBITMQ_PASS'),
    os.getenv('RABBITMQ_HOST')
)
broker_url = '{}/{}'.format(
    RABBITMQ_SERVER,
    os.getenv('RABBITMQ_VHOST'),
)
backend = 'amqp'

How can I solve this? Thank you!

Celery: 4.2.0

I am using job = chain(single_job), but i only have one single_job job() starting the job.

mysql> show processlist;
+-------+------+---------------+------------------+---------+------+-----------+
| Id    | User | Host          | db               | Command | Time | State     |
+-------+------+---------------+------------------+---------+------+-----------+
| 97189 | clp  | 172.11.17.202 | bain_ai_database | Query   |    0 | init      |
| 97488 | clp  | 172.11.11.252 | bain_ai_database | Query   | 1505 | executing |
| 97489 | clp  | 172.11.11.252 | bain_ai_database | Sleep   | 1851 |           |
| 97543 | clp  | 172.21.6.242  | bain_ai_database | Query   |   51 | updating  |
| 97544 | clp  | 172.21.6.242  | bain_ai_database | Sleep   |   51 |           |
+-------+------+---------------+------------------+---------+------+-----------+

Solution

  • Depending on how you execute the sql query, here is what I would try. (1) since you have bind=True, the task should be the first parameter to your function. The convention in celery is to call that first parameter self. (2) You want to try and catch the database level exception that is occurring and ignore it.

    from celery.utils.log import get_task_logger
    
    log = get_task_logger(__name__)
    
    
    @celery.task(bind=True, acks_late=True)
    def etl_pipeline(self, dev=dev, test=test):
        try:
            # try querying the database here using sqlalchemy or mysqlconnect??
        except Exception as ex:
            # for now, log the exception and type so that you can drill down into what is happening
            log.info('[etl_pipeline]  exception of type %s.%s: %s', ex.__class__.__module__, ex.__class__.__name__, ex)
            raise       
    

    The debugging that you will get from the logging should help you determine which error you are getting on the client side.