Search code examples
python-asyncio

Asyncio: batched queue processor


I am trying to build a batched queue processor with asyncio. The way it should work is that one should be able to push individual requests into the queue and that the queue processor would batch these up based on some heuristic, process the batch and that it would then release the results back to the calling tasks. That each calling task should be blocked/awaited until it's result is available.

Here is what I got to so far:

import asyncio
from typing import TypedDict, List, Any, Dict

class Event(TypedDict):
    id: int
    payload: Any

count_tokens = lambda content: len(str(content))

class BatchProcessor:
    queue: asyncio.Queue[Event] = asyncio.Queue()
    lock = asyncio.Lock()
    batch_processed = asyncio.Event()
    results: Dict[int, Any] = {}
    counter: int = 0

    def __init__(self, tokens_per_batch = 100) -> None:
        self.queue = asyncio.Queue()
        self.tokens_per_batch = tokens_per_batch
        self.processor_task = asyncio.create_task(self.processor())
        
    async def next_counter(self):
        async with self.lock:
            self.counter += 1
            return self.counter

    async def processor(self) -> None:
        batch: List[Event] = []
        tokens = 0
        while True:
            event = await self.queue.get()
            batch.append(event)
            tokens += count_tokens(event)
            print(f"Tokens: {tokens}")
            if tokens > self.tokens_per_batch:
                print(f"Triggering batch of {len(batch)} items")
                self.results.update(await self.process_batch(batch))
                print(f"Notifying batch processed event for {self.results} items")
                self.batch_processed.set()
                batch = []
                tokens = 0
                
    async def process_batch(self, batch: List[Any]) -> Dict[int, Any]:
        print(f"Processing batch of {len(batch)} items")
        await asyncio.sleep(1)
        return {event.id: f"{event.payload} processed" for event in batch}
    
    async def add_to_batch(self, payload: Any ) -> None:
        id = await self.next_counter()
        self.queue.put_nowait(Event(id=id, payload=payload))
        # wait for event
        print(f"Added request {id} to batch")
        while True:
            await self.batch_processed.wait()
            print(f"Checking for result of request {id}")
            if id in self.results:
                result = self.results.pop(id)
                print(f"Result for request {id}: {result}")
                return result

async def main():
    batch_processor = BatchProcessor(tokens_per_batch=100)

    # Simulating client requests
    coros = [batch_processor.add_to_batch(f"data_{i}") for i in range(1, 11)]

    # Wait for all requests to be processed
    results = await asyncio.gather(*coros)

    print("Results:", results)
    

# Run the main function
asyncio.run(main())

Producing this output (and then getting stuck):

Added request 1 to batch
Added request 2 to batch
Added request 3 to batch
Added request 4 to batch
Added request 5 to batch
Added request 6 to batch
Added request 7 to batch
Added request 8 to batch
Added request 9 to batch
Added request 10 to batch
Tokens: 30
Tokens: 60
Tokens: 90
Tokens: 120
Triggering batch of 4 items
Processing batch of 4 items

Although I have worked with asyncio for a little bit, I am realizing that my understanding is severely lacking. For instance, it appears that the process_batch never returns. Also, I am unsure where and when the batch_processed event should be cleared again.

Next, I was planning on adding a flush method that would force the remaining events in the batch to get processed, but first am need to get the basics working.

I would be most grateful for any hints.


Solution

  • The asyncio has a special object for awaiting a result. It's the Future and plays a key role in the asyncio. Despite considered low-level, I think it is exactly what you were asking for when you wrote "calling task should be blocked/awaited until it's result is available".

    Below is a working example. Sorry for using different terminology. Submitting a job returns a Future. The jobs are stored in a list until a batch is run. The batch processor is a long running task (until explicitly stopped). It is controlled by commands sent via a queue.

    import asyncio, enum
    
    _Command = enum.Enum("_Command", "FLUSH STOP NEWJOB")
    
    class BatchProcessor:
        def __init__(self):
            self._jobs = []
            self._queue = asyncio.Queue()
            self._task = asyncio.create_task(self._processor())
    
        def submit_job(self, payload) -> asyncio.Future:
            # TODO: check if stop() was not called (in other functions too)
            self._jobs.append((payload, future_result := asyncio.Future()))
            self._queue.put_nowait(_Command.NEWJOB)
            return future_result
    
        def flush(self):
            self._queue.put_nowait(_Command.FLUSH)
    
        async def stop(self):
            self._queue.put_nowait(_Command.STOP)
            await self._task
            self._task = None
    
        async def _run_batch(self, jobs):
            print(f"batch run with {len(jobs)} jobs")
            for payload, future_result in jobs:
                result = f"done: {payload}"
                future_result.set_result(result)
            # TODO: future_result.set_exception on error
    
        def _condition(self) -> bool:
            # when to run a batch?
            return len(self._jobs) >= 5
    
        async def _processor(self):
            while True:
                cmd = await self._queue.get()
                if (cmd in (_Command.FLUSH, _Command.STOP)
                    or cmd is _Command.NEWJOB and self._condition()
                    ):
                    jobs, self._jobs = self._jobs, []
                    await self._run_batch(jobs)
                if cmd is _Command.STOP:
                    return
    
    async def test_task(bp, i):
        print(f"submitting job {i}")
        result = await bp.submit_job(i)
        print(f"got {result}")
    
    async def main():
        bp = BatchProcessor()
        for i in range(13): 
            asyncio.create_task(test_task(bp, i))
            await asyncio.sleep(0.5)
        await bp.stop()
    
    if __name__ == "__main__":
        asyncio.run(main())