I am trying to build a batched queue processor with asyncio
. The way it should work is that one should be able to push individual requests into the queue and that the queue processor would batch these up based on some heuristic, process the batch and that it would then release the results back to the calling tasks. That each calling task should be blocked/awaited until it's result is available.
Here is what I got to so far:
import asyncio
from typing import TypedDict, List, Any, Dict
class Event(TypedDict):
id: int
payload: Any
count_tokens = lambda content: len(str(content))
class BatchProcessor:
queue: asyncio.Queue[Event] = asyncio.Queue()
lock = asyncio.Lock()
batch_processed = asyncio.Event()
results: Dict[int, Any] = {}
counter: int = 0
def __init__(self, tokens_per_batch = 100) -> None:
self.queue = asyncio.Queue()
self.tokens_per_batch = tokens_per_batch
self.processor_task = asyncio.create_task(self.processor())
async def next_counter(self):
async with self.lock:
self.counter += 1
return self.counter
async def processor(self) -> None:
batch: List[Event] = []
tokens = 0
while True:
event = await self.queue.get()
batch.append(event)
tokens += count_tokens(event)
print(f"Tokens: {tokens}")
if tokens > self.tokens_per_batch:
print(f"Triggering batch of {len(batch)} items")
self.results.update(await self.process_batch(batch))
print(f"Notifying batch processed event for {self.results} items")
self.batch_processed.set()
batch = []
tokens = 0
async def process_batch(self, batch: List[Any]) -> Dict[int, Any]:
print(f"Processing batch of {len(batch)} items")
await asyncio.sleep(1)
return {event.id: f"{event.payload} processed" for event in batch}
async def add_to_batch(self, payload: Any ) -> None:
id = await self.next_counter()
self.queue.put_nowait(Event(id=id, payload=payload))
# wait for event
print(f"Added request {id} to batch")
while True:
await self.batch_processed.wait()
print(f"Checking for result of request {id}")
if id in self.results:
result = self.results.pop(id)
print(f"Result for request {id}: {result}")
return result
async def main():
batch_processor = BatchProcessor(tokens_per_batch=100)
# Simulating client requests
coros = [batch_processor.add_to_batch(f"data_{i}") for i in range(1, 11)]
# Wait for all requests to be processed
results = await asyncio.gather(*coros)
print("Results:", results)
# Run the main function
asyncio.run(main())
Producing this output (and then getting stuck):
Added request 1 to batch
Added request 2 to batch
Added request 3 to batch
Added request 4 to batch
Added request 5 to batch
Added request 6 to batch
Added request 7 to batch
Added request 8 to batch
Added request 9 to batch
Added request 10 to batch
Tokens: 30
Tokens: 60
Tokens: 90
Tokens: 120
Triggering batch of 4 items
Processing batch of 4 items
Although I have worked with asyncio
for a little bit, I am realizing that my understanding is severely lacking. For instance, it appears that the process_batch
never returns. Also, I am unsure where and when the batch_processed
event should be cleared again.
Next, I was planning on adding a flush
method that would force the remaining events in the batch to get processed, but first am need to get the basics working.
I would be most grateful for any hints.
The asyncio has a special object for awaiting a result. It's the Future
and plays a key role in the asyncio. Despite considered low-level, I think it is exactly what you were asking for when you wrote "calling task should be blocked/awaited until it's result is available".
Below is a working example. Sorry for using different terminology. Submitting a job returns a Future. The jobs are stored in a list until a batch is run. The batch processor is a long running task (until explicitly stopped). It is controlled by commands sent via a queue.
import asyncio, enum
_Command = enum.Enum("_Command", "FLUSH STOP NEWJOB")
class BatchProcessor:
def __init__(self):
self._jobs = []
self._queue = asyncio.Queue()
self._task = asyncio.create_task(self._processor())
def submit_job(self, payload) -> asyncio.Future:
# TODO: check if stop() was not called (in other functions too)
self._jobs.append((payload, future_result := asyncio.Future()))
self._queue.put_nowait(_Command.NEWJOB)
return future_result
def flush(self):
self._queue.put_nowait(_Command.FLUSH)
async def stop(self):
self._queue.put_nowait(_Command.STOP)
await self._task
self._task = None
async def _run_batch(self, jobs):
print(f"batch run with {len(jobs)} jobs")
for payload, future_result in jobs:
result = f"done: {payload}"
future_result.set_result(result)
# TODO: future_result.set_exception on error
def _condition(self) -> bool:
# when to run a batch?
return len(self._jobs) >= 5
async def _processor(self):
while True:
cmd = await self._queue.get()
if (cmd in (_Command.FLUSH, _Command.STOP)
or cmd is _Command.NEWJOB and self._condition()
):
jobs, self._jobs = self._jobs, []
await self._run_batch(jobs)
if cmd is _Command.STOP:
return
async def test_task(bp, i):
print(f"submitting job {i}")
result = await bp.submit_job(i)
print(f"got {result}")
async def main():
bp = BatchProcessor()
for i in range(13):
asyncio.create_task(test_task(bp, i))
await asyncio.sleep(0.5)
await bp.stop()
if __name__ == "__main__":
asyncio.run(main())