Search code examples
djangoapache-kafkaceleryceleryd

Continously running Celery Task in Django


I've a Django app which should constantly listen for messages from Kafka and then send them via WebSocket to client. Problem is how to setup constant listener. For future scalability we decided to bring Celery in project to manage these issues with scaling.

My task actually look like:

class ConsumerTask(Task):
    name = 'consume_messages'

    def run(self, *args, **kwargs):
        consumer = get_kafka_consumer(settings.KAFKA_URL,
                                      settings.FAULT_MESSAGES_KAFKA_TOPIC,
                                      'consumer_messages_group')
        logger.info("Kafka's consumer has been started")

        while True:
            messages = consumer.poll()
            for _, messages in messages.items():
                messages, messages_count = self.get_message(messages)
                if messages_count > 0:
                    messages = save_to_db()
                    send_via_websocket_messages(messages)

It properly saves and sends messages via WS, but problems comes from infinite loop in task. For some reason (probably task timeout constraint) task pop out if queue and never runs again. I am not sure that daemonizing celery workers will solve this problem. Could you please provide some strategies now organize "Constantly running part" of this process?


Solution

  • Your use case does not fit celery tasks well. Celery tasks should not be long running processes. You need to put tasks in the broker queue which also makes no sense in your setup.

    Think about your while-True-loop rather as a celery worker. The worker is supposed to run constantly and it's also the process you need to scale up when handling more tasks.

    Write a Django management command that uses your while-True-loop and use the kind of scaling you would use to scale celery workers to run multiple instances of that managemement command.

    Use process management tools for scaling up processes like honcho or supervisord.