Search code examples
pythonconcurrencygevent

gevent - Pass a value to many greenlets in parallel


I'm trying to implement a simple gevent setup. There is a sender that should send a value to several waiters in parallel. The Event class gets the closest to solving this, shown below.

Every three seconds, the setter creates an event, which unblocks all the waiters. The event is cleared right after, so the waiters block again until the next time.

import gevent
from gevent.event import Event

evt = Event()

def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    while True:
        gevent.sleep(3)
        evt.set()
        evt.clear()

def waiter(arg):
    while True:
        evt.wait()
        print("waiter {}".format(arg))

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter,1),
        gevent.spawn(waiter,2),
        gevent.spawn(waiter,3),
    ])

if __name__ == '__main__': main()

Now, I only need to do this, with the addition of passing a value to the waiters. The obvious choice would be to use an AsyncResult. Yet, it is not possible to clear the AsyncResult object, so the waiters end up in an infinite loop.

Do you have any ideas how to implement this?


Solution

  • I think your best bet is to use queues for this. I created a BroadcastQueue class which makes it a little easier to manage sending one value to many consumers. The producer calls BroadcastQueue.broadcast(), which will send a value to all the registered consumers. Consumers register by calling BroadcastQueue.register, which returns a unique gevent.queue.Queue() object. The consumers then use that object to get messages from the producer.

    import gevent
    from gevent.queue import Queue
    
    
    class BroadcastQueue(object):
        def __init__(self):
            self._queues = []
    
        def register(self):
            q = Queue()
            self._queues.append(q)
            return q
    
        def broadcast(self, val):
            for q in self._queues:
                q.put(val)
    
    
    def setter(bqueue):
        '''After 3 seconds, wake all threads waiting on the value of evt'''
        while True:
            gevent.sleep(3)
            bqueue.broadcast("hi")
    
    def waiter(arg, bqueue):
        queue = bqueue.register()
        while True:
            val = queue.get()
            print("waiter {} {}".format(arg, val))
    
    def main():
        bqueue = BroadcastQueue()
        gevent.joinall([
            gevent.spawn(setter, bqueue),
            gevent.spawn(waiter, 1, bqueue),
            gevent.spawn(waiter, 2, bqueue),
            gevent.spawn(waiter, 3, bqueue),
        ])
    
    if __name__ == '__main__':
        main()
    

    Output:

    waiter 1 hi
    waiter 2 hi
    waiter 3 hi
    waiter 1 hi
    waiter 2 hi
    waiter 3 hi
    waiter 1 hi
    waiter 2 hi
    waiter 3 hi
    waiter 1 hi
    waiter 2 hi
    waiter 3 hi
    waiter 1 hi
    waiter 2 hi
    waiter 3 hi