Search code examples
pythondjangoceleryazure-servicebus-queues

How to replace celery task with azure service bus in a django application?


I am asked to use the azure service bus instead of celery in a Django application.

Read the documentation provided but didn't get a clear picture of using service bus instead of a celery task. Any advice provided would be of great help.


Solution

  • Before getting into it, I would like to highlight the differences between Azure Service Bus and Celery.

    Azure Service Bus :

    Microsoft Azure Service Bus is a fully managed enterprise integration message broker.

    You could refer this to know more about the service bus

    Celery :

    Distributed task queue. Celery is an asynchronous task queue/job queue based on distributed message passing.

    I could think of 2 possibilities in your case :

    1. You would like to use Service Bus with Celery in place of other message brokers.
    2. Replace Celery with the Service Bus

    1 : You would like to use Service Bus with Celery in place of other message brokers.

    You could refer this to understand why celery needs a message broker. I am not sure which messaging broker you are using currently, but you could use the Kombu library to meet your requirement.

    Reference for Azure Service Bus : https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html

    Reference for others : https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html

    2 : Replace Celery with the Service Bus completely To meet your requirement :

    Consider

    • Message senders are producers
    • Message receivers are consumers

    These are two different application that you will have to work on.

    You could refer the below to get more sample code to build on.

    https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples

    Explanation :

    • Every time you would like to execute the actions, you could send messages to a topic from the producer client.
    • The Consumer Client - the application that is listening, will receive the message and process the same. You could attach your custom process to it - in that way the your custom process gets executed whenever a message is received at the consumer client end.

    The below is sample of the receiving client :

    from azure.servicebus.aio import SubscriptionClient
    import asyncio
    import nest_asyncio
    nest_asyncio.apply()
    
            
    Receiving = True
    
    #Topic 1 receiver : 
    conn_str= "<>"
    name="Allmessages1"
    SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
    receiver =  SubsClient.get_receiver()
    
    async def receive_message_from1():
        await receiver.open()
        print("Opening the Receiver for Topic1")
        async with receiver:
          while(Receiving):
            msgs =  await receiver.fetch_next()
            for m in msgs:
                print("Received the message from topic 1.....")
                ##### - Your code to execute when a message is received - ########
                print(str(m))
                ##### - Your code to execute when a message is received - ########
                await m.complete()
                
                
    loop = asyncio.get_event_loop()
    topic1receiver = loop.create_task(receive_message_from1())
    

    the section between the below line would be instruction that will be executed every time a message is received.

    ##### - Your code to execute when a message is received - ########