Search code examples
pythonasynchronoustwisted

Can two or more coroutines await for the same Deferred in Twisted?


I am trying to use a deferred to signal the end of a task to multiple coroutines that may be waiting for it. I want the same behavior of events in threading.Event. The coroutines waits but only one gets the result of the fired deferred. Code example:

from twisted.internet import defer, task, reactor

async def test(d):
    print("Awaiting")
    print(f"Await finished: {await d}")

d = defer.Deferred()
defer.ensureDeferred(test(d))
defer.ensureDeferred(test(d))
task.deferLater(reactor, 1, d.callback, 'Deferred Fired')

reactor.run()

Outputs:

Awaiting
Awaiting
Await finished: Deferred Fired
Await finished: None

I was expecting:

Awaiting
Awaiting
Await finished: Deferred Fired
Await finished: Deferred Fired

But it works fine when one coroutine awaits two or more times:

async def test(d):
    print("Awaiting")
    print(f"Await finished: {await d}")
    print(f"Await finished: {await d}")

d = defer.Deferred()
defer.ensureDeferred(test(d))
task.deferLater(reactor, 1, d.callback, 'Deferred Fired')

Solution

  • You're trying to use deferreds as a method to synchronize, but that's not really its intended purpose, at least not on their own. I assume you want to use Deferred like you use Event.wait()? This is the paradigm I've usually seen:

    from dataclasses import dataclass, field
    from typing import List
    
    from twisted.internet import defer, reactor
    
    @dataclass
    class Thing:
    
        deferred_list: List[defer.Deferred] = field(default_factory=list)
    
        def notifyFinished(self) -> defer.Deferred:
            deferred = defer.Deferred()
            self.deferred_list.append(deferred)
            return deferred
    
        def finish(self):
            for index, deferred in enumerate(self.deferred_list):
                deferred.callback(index + 1)
            self.deferred_list = []
    
    async def doSomethingElse(d):
        print("[!] awaiting...")
        print(f"[x] done waiting: {await d}")
    
    def main():
        thing = Thing()
        for _ in range(5):
            d = thing.notifyFinished()
            defer.ensureDeferred(doSomethingElse(d))
        reactor.callLater(5, thing.finish)
        reactor.run()
    
    main()
    

    Here we have a function that returns a deferred/future (ie. notifyFinished), keep track of those deferred, and once a job is complete, calls a function that signals the end of the job (ie. finish). Deferred are intended to only fire once (Future might too, but don't quote me on that), so using this method ensures that individual Deferred objects are returned and fire at the same time.