Search code examples
flaskredisdocker-composecelerybeatkombu

Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists


I'm trying to spawn a few background, celery beat processes using docker-compose, but they are not working anymore. My configuration:

docker-compose-dev.yml

worker-periodic:
    image: dev3_web
    restart: always
    volumes:
      - ./services/web:/usr/src/app
      - ./services/web/celery_logs:/usr/src/app/celery_logs

    command: celery beat -A celery_worker.celery --schedule=/tmp/celerybeat-schedule --loglevel=DEBUG --pidfile=/tmp/celerybeat.pid
    environment:
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - FLASK_ENV=development
      - APP_SETTINGS=project.config.DevelopmentConfig
      - DATABASE_URL=postgres://postgres:postgres@web-db:5432/web_dev 
      - DATABASE_TEST_URL=postgres://postgres:postgres@web-db:5432/web_test
      - SECRET_KEY=my_precious  
    depends_on:
      - web
      - redis
      - web-db
    links:
      - redis:redis
      - web-db:web-db

after I up containers, I $ docker ps and get (note how worker-periodic_1 is always up a few seconds before):

697322a621d5        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-analysis_1
d8e414aa4e5b        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-learning_1
ae327266132c        dev3_web               "flower -A celery_wo…"   24 hours ago        Up 5 minutes        0.0.0.0:5555->5555/tcp             dev3_monitor_1
3ccb79e01412        dev3_web               "celery beat -A cele…"   24 hours ago        Up 14 seconds                                          dev3_worker-periodic_1
a50e1276f692        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-scraping_1

All celery workers are working when endpoints are called, except when it is a celery beat, periodically automated, process. When I up containers, my logs complain at celery_logs/worker_analysis.log:

    [2019-11-16 23:29:20,880: DEBUG/MainProcess] pidbox received method hello(from_node='celery@d8e414aa4e5b', revoked={}) [reply_to:{'exchange': 'reply.celery.pidbox', 'routing_key': '85f4128f-2f75-3996-8375-2a19aa58d5d4'} ticket:0daa0dc4-fa09-438d-9003-ccbd39f259dd]
    [2019-11-16 23:29:20,907: INFO/MainProcess] sync with celery@d8e414aa4e5b
    [2019-11-16 23:29:21,018: ERROR/MainProcess] Control command error: OperationalError("\nCannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.\nProbably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.\n",)
    Traceback (most recent call last):
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 439, in _reraise_as_library_errors
        yield
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 518, in _ensured
        return fun(*args, **kwargs)
      File "/usr/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
        mandatory=mandatory, immediate=immediate,
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 605, in basic_publish
        message, exchange, routing_key, **kwargs
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/exchange.py", line 70, in deliver
        for queue in _lookup(exchange, routing_key):
      File "/usr/lib/python3.6/site-packages/kombu/transport/redis.py", line 877, in _lookup
        exchange, redis_key))
    kombu.exceptions.InconsistencyError: 
    Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
    Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.


    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/lib/python3.6/site-packages/celery/worker/pidbox.py", line 46, in on_message
        self.node.handle_message(body, message)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 145, in handle_message
        return self.dispatch(**body)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 115, in dispatch
        ticket=ticket)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 151, in reply
        serializer=self.mailbox.serializer)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 285, in _publish_reply
        **opts
      File "/usr/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
        exchange_name, declare,
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 551, in _ensured
        errback and errback(exc, 0)
      File "/usr/lib/python3.6/contextlib.py", line 99, in __exit__
        self.gen.throw(type, value, traceback)
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 444, in _reraise_as_library_errors
        sys.exc_info()[2])
      File "/usr/lib/python3.6/site-packages/vine/five.py", line 194, in reraise
        raise value.with_traceback(tb)
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 439, in _reraise_as_library_errors
        yield
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 518, in _ensured
        return fun(*args, **kwargs)
at celer
      File "/usr/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
        mandatory=mandatory, immediate=immediate,
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 605, in basic_publish
        message, exchange, routing_key, **kwargs
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/exchange.py", line 70, in deliver
        for queue in _lookup(exchange, routing_key):
      File "/usr/lib/python3.6/site-packages/kombu/transport/redis.py", line 877, in _lookup
        exchange, redis_key))
    kombu.exceptions.OperationalError: 
    Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
    Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.

this is how celery is configured:

web/project/config.py:

class DevelopmentConfig(BaseConfig):
    # CELERY
    INSTALLED_APPS = ['routes']
    # celery config
    CELERYD_CONCURRENCY = 4
    # Add a one-minute timeout to all Celery tasks.
    CELERYD_TASK_SOFT_TIME_LIMIT = 60
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = 'America/Sao_Paulo'
    CELERY_BROKER_URL = os.environ.get('CELERY_BROKER')
    CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
    CELERY_IMPORTS = ('project.api.routes.background',)
    # periodic tasks
    CELERYBEAT_SCHEDULE =  {
                        'playlist_generator_with_audio_features': {
                            'task': 'project.api.routes.background.playlist_generator_with_audio_features',
                            # Every minute
                            'schedule': crontab(minute=59),
                            'args' : [('user_id'),]
                            },
                        'cache_user_tracks_with_analysis': {
                            'task': 'project.api.routes.background.cache_user_tracks_with_analysis',
                            # Every hour
                            'schedule': crontab(minute=0, hour='*/1'),
                            'args' : ('user_id','token')
                            },
                        }

this is an example task at project/api/routes/background.py, at my Flask server:

@celery.task(queue='analysis', default_retry_delay=30, max_retries=3, soft_time_limit=1000)
def cache_user_tracks_with_analysis(user_id, token):
   # business logic
   return {'Status': 'Task completed!',
        'features': results}

In my requirements.txt:, kombu is not declared explicitly, and I have:

celery==4.2.1
redis==3.2.0

what am I missing?


Solution

  • This is an open celery/kombu issue: https://github.com/celery/kombu/issues/1063

    explicitly downgrading to kombu==4.5.0 fixed the error for me.