Search code examples
pythoncelerycoroutineeventlet

Can celery cooperatively run coroutines as stateful/resumable tasks?


I'm currently investigating Celery for use in an video-processing backend. Essentially my problem is as follows:

  1. I have a frontend web server that concurrently processes a large number of video streams (on the order of thousands).
  2. Each stream must be processed independently and in parallel.
  3. Stream processing can be divided into two types of operations:
    1. Frame-by-frame operations (computations that do not need information about the preceding or following frame(s))
    2. Stream-level operations (computations that work on a subset of ordered, adjacent frames)

Given point 3, I need to maintain and update an ordered structure of frames throughout the process and farm computations on subsections of this structure to Celery workers. Initially, I thought about organizing things as follows:

[frontend server]  -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]

The idea is that celery worker 1 executes long-running tasks that are primarily I/O-bound. In essence, these tasks would only do the following:

  1. Read a frame from the frontend server
  2. Decode the frame from it's base64 representation
  3. Enqueue it in the aforementioned ordered data structure (a collections.deque object, as it currently stands).

Any CPU-bound operations (i.e. image analysis) are shipped off to celery worker 2.

My problem is as follows:

I would like to execute a coroutine as a task such that I have a long-running tasks from which I can yield so as to not block celery worker 1's operations. In other words, I'd like to be able to do something akin to:

def coroutine(func):
    @wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.next()
        return cr
    return start

@coroutine
def my_taks():
    stream = deque()  # collections.deque
    source = MyAsynchronousInputThingy()  # something i'll make myself, probably using select

    while source.open:
        if source.has_data:
            stream.append(Frame(source.readline()))  # read data, build frame and enqueue to persistent structure
        yield  # cooperatively interrupt so that other tasks can execute

Is there a way to make a coroutine-based task run indefinitely, ideally producing results as they are yielded?


Solution

  • Primary idea behind Eventlet is that you want to write synchronous code, as with threads, socket.recv() should block current thread until next statement. This style is very easy to read, maintain and reason about while debugging. To make things effective and scalable, behind scenes, Eventlet does the magic to replace seemingly blocking code with green threads and epoll/kqueue/etc mechanisms to wake up those green threads at proper times.

    So all you need is execute eventlet.monkey_patch() as soon as possible (e.g. second line in module) and make sure you use pure Python socket operations in MyInputThingy. Forget about asynchronous, just write normal blocking code as you would with threads.

    Eventlet makes synchronous code good again.