Search code examples
python-trio

how to structure beginning and ending synchronous calls using trio?


My ask is for structured trio pseudo-code (actual trio function-calls, but dummy worker-does-work-here fill-in) so I can understand and try out good flow-control practices for switching between synchronous and asynchronous processes.

I want to do the following...

  • load a file of json-data into a data-dict
    • aside: the data-dict looks like { 'key_a': {(info_dict_a)}, 'key_b': {info_dict_b} }
  • have each of n-workers...
    • access that data-dict to find the next record-to-process info-dict
    • prepare some data from the record-being-processed and post the data to a url
    • process the post-response to update a 'response' key in the record-being-processed info-dict
    • update the data-dict with the key's info-dict
    • overwrite the original file of json-data with the updated data-dict

Aside: I know there are other ways I could achieve my overall goal than the clunky repeated rewrite of a json file -- but I'm not asking for that input; I really would like to understand trio well enough to be able to use it for this flow.

So, the processes that I want to be synchronous:

  • the get next record-to-process info-dict
  • the updating of the data-dict
  • the overwriting of the original file of json-data with the updated data-dict

New to trio, I have working code here ...which I believe is getting the next record-to-process synchronously (via using a trio.Semaphore() technique). But I'm pretty sure I'm not saving the file synchronously.

Learning Go a few years ago, I felt I grokked the approaches to interweaving synchronous and asynchronous calls -- but am not there yet with trio. Thanks in advance.


Solution

  • This code uses channels to multiplex requests to and from a pool of workers. I found the additional requirement (in your code comments) that the post-response rate is throttled, so read_entries sleeps after each send.

    from random import random    
    import time, asks, trio    
    
    snd_input, rcv_input = trio.open_memory_channel(0)
    snd_output, rcv_output = trio.open_memory_channel(0)    
    
    async def read_entries():
        async with snd_input:
            for key_entry in range(10):
                print("reading", key_entry)    
                await snd_input.send(key_entry)    
                await trio.sleep(1)    
    
    async def work(n):
        async for key_entry in rcv_input:    
            print(f"w{n} {time.monotonic()} posting", key_entry)    
            r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
            await snd_output.send((r.status_code, key_entry))
    
    async def save_entries():    
        async for entry in rcv_output:    
            print("saving", entry)    
    
    async def main():    
        async with trio.open_nursery() as nursery:
            nursery.start_soon(read_entries)    
            nursery.start_soon(save_entries)    
            async with snd_output:
                async with trio.open_nursery() as workers:
                    for n in range(3):
                        workers.start_soon(work, n)
    
    trio.run(main)