I'm currently investigating Celery for use in an video-processing backend. Essentially my problem is as follows:
Given point 3, I need to maintain and update an ordered structure of frames throughout the process and farm computations on subsections of this structure to Celery workers. Initially, I thought about organizing things as follows:
[frontend server] -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]
The idea is that celery worker 1
executes long-running tasks that are primarily I/O-bound. In essence, these tasks would only do the following:
collections.deque
object, as it currently stands).Any CPU-bound operations (i.e. image analysis) are shipped off to celery worker 2
.
I would like to execute a coroutine as a task such that I have a long-running tasks from which I can yield
so as to not block celery worker 1
's operations. In other words, I'd like to be able to do something akin to:
def coroutine(func):
@wraps(func)
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def my_taks():
stream = deque() # collections.deque
source = MyAsynchronousInputThingy() # something i'll make myself, probably using select
while source.open:
if source.has_data:
stream.append(Frame(source.readline())) # read data, build frame and enqueue to persistent structure
yield # cooperatively interrupt so that other tasks can execute
Is there a way to make a coroutine-based task run indefinitely, ideally producing results as they are yield
ed?
Primary idea behind Eventlet is that you want to write synchronous code, as with threads, socket.recv()
should block current thread until next statement. This style is very easy to read, maintain and reason about while debugging. To make things effective and scalable, behind scenes, Eventlet does the magic to replace seemingly blocking code with green threads and epoll/kqueue/etc mechanisms to wake up those green threads at proper times.
So all you need is execute eventlet.monkey_patch()
as soon as possible (e.g. second line in module) and make sure you use pure Python socket operations in MyInputThingy
. Forget about asynchronous, just write normal blocking code as you would with threads.
Eventlet makes synchronous code good again.