Search code examples
pythonmultithreadingopencvceleryflower

Sending inputs and image processing jobs to multiple machines using Celery and Python


Recently I have been playing with celery and flower (for dashboard and task visualisation on a single machine) in Ubuntu using python 3.x. First I have installed rabbitmq-server, radis, celery and flower. Then I have created a script called tasks.py containing the following:

from celery import Celery

# py-advanced-message-queuing-protocol
app = Celery('tasks', backend='redis://localhost', broker='pyamqp://localhost//')

@app.task
def intensive_sum1(num):
    val = sum(x**4 for x in range(num))
    return val


@app.task
def intensive_sum2(num):
    val = sum(x**4 for x in range(num))
    return val

@app.task
def intensive_sum3(num):
    val = sum(x**4 for x in range(num))
    return val

Then I have created a script run.py containing

from tasks import intensive_sum1, intensive_sum2, intensive_sum3
import time

start = time.time()
result1 = intensive_sum1.delay(100000000)
result2 = intensive_sum2.delay(100000000)
result3 = intensive_sum3.delay(100000000)
print(result1.get(), result2.get(), result3.get())
end = time.time()
print('time: ', end - start)

start = time.time()
result1 = sum(x**4 for x in range(100000000))
result2 = sum(x**4 for x in range(100000000))
result3 = sum(x**4 for x in range(100000000))
print(result1, result2, result3)
end = time.time()
print('time: ', end - start)

Before running the latter I started two distinct terminals and changed the directory to the location of the two scripts. I then ran sudo celery -A tasks flower in one terminal and celery -A tasks worker --loglevel=info in the other terminal. It turns out that (surprise surprise) celery can distribute each task to an individual core resulting in tremendous time saving. Of course this time saving is only to be expected for large functions as smaller ones will incur thread generation overhead which brings no benefit.

This has made me think about another problem. Let's say that instead of a single machine I have 3 machines connected to the same WIFI router. I can work out the IP address for each of these Ubuntu machines using ifconfig command. Lets us say that one of these machines is a master machine that contains a main.py script which captures real-time images using Opencv-Python capture object. It is then taking each image, serialising it and sending it as a message to two worker machines. Both worker machines work independently and both de-serialise the same image. One worker machine does cat classification and returns a probability of cat, the other machine does dog classification and returns a probability of dog. One worker machine may take longer to reach a conclusion than the other. However, for that particular frame the master machine needs to wait for both classification results before overlaying some results on top on that particular frame. Instinctively, I am led to believe that master machine needs to check if both jobs are ready before moving forward (e.g. result_worker_one.ready() == result_worker_two.ready() == True). How can I achieve this behaviour ? How can I serialise one RGB image in master machine and de-serialise it in worker machines ? What backend and broker need each machine ? How can this be set up as a client server architecture ?


Solution

  • You are correct about distributing jobs across multiple machines. In fact it's one of the main purposes of celery.

    1. To check if two asynchronous jobs have finished you can use Groups and Chords options in celery. assume your two celery tasks are as follows:

      @app.task
      def check_dog():
          #dog_classification code
      
      @app.task
      def check_cat():
          #cat classification code
      

      You can group these tasks together and then use a chord(A chord is a task that only executes after all of the tasks in a group have finished executing) to go to the next step after both these functions have executed. Include whatever you need to after the two tasks in the callback function shown below. related documentation can be found here: http://docs.celeryproject.org/en/master/userguide/canvas.html#groups

      chord([check_dog(),check_cat()])(callback)
      
    2. Take a look at this for the image serializing part:Passing an image to a celery task

    3. To answer the 3rd part of the question, Celery inherently follows a client server architecture to support parallel computing. Whenever you call a celery task it will place a message on the message broker that you have set(in your case you have used rabbitMQ). This message will contain info about what task to run along with all the required arguments.Message queue will deliver messages to Celery workers across different machines. Once a worker gets a message, the worker will execute the task described by the message. Therefore if you want to distribute your tasks among multiple computers computers all you have to do is to start a celery worker in each machine which listens to your message queue in your main machine. You can configure the workers as follows

      app = Celery('tasks', backend='redis://localhost', broker='pyamqp://<username>:<password>@<ip of task queue host>')
      

      make sure you provide a task file to each celery worker because message passed to the worker does not contain the source code but just the task name itself only.