Search code examples
pythonpython-asyncio

Asyncio.Queue consumer not getting called


I have an asyncio.Queue producer and consumer running as 2 infinite loops. The producer periodically adds jobs to the queue and the consumer waits until a job is available, then processes it, then waits for the next job.

However, for some reason, my consumer is not getting called. I think this is because the producer task never yields to the consumer?

Any ideas on how to fix it so that both workers run in the background as described?

import asyncio
import concurrent.futures
import time

class Consumer:
    def __init__(self, queue: asyncio.Queue):
        self._duration_before_restart_ms = 3000
        self._queue = queue
        self._last_triggered_time_ms = 0

    async def consumer_loop(self):
        while True:
            print("Consumer new iteration.")
            detected_time_ms = await self._queue.get()
            print("Consumer new event: ", detected_time_ms)
            if (
                detected_time_ms - self._duration_before_restart_ms
                < self._last_triggered_time_ms
            ):
                print("Consumer skipping event: ", detected_time_ms)
                # Invalidate all items in queue that happened before
                # _last_triggered_time_ms.
                continue
            print("Consumer processing event: ", detected_time_ms)
            # Simulate authentication (an io bound operation) with sleep.
            time.sleep(5)
            self._last_triggered_time_ms = int(time.time() * 1000)
            print(
                "Consumer processed event: ",
                detected_time_ms,
                " at: ",
                self._last_triggered_time_ms,
            )


class Producer:
    def __init__(self, queue: asyncio.Queue):
        self._detection_time_period_ms = 3000
        self._last_detection_time_ms = 0
        self._queue = queue

    async def producer_loop(self):
        counter = 0
        while True:
            # Iterates at 2fps
            time.sleep(0.5)
            print("Producer counter: ", counter)
            current_time_ms = int(time.time() * 1000)

            if (counter % 10 > 5) and (
                self._last_detection_time_ms + self._detection_time_period_ms
                < current_time_ms
            ):
                print("Producer adding to queue: ", current_time_ms)
                await self._queue.put(current_time_ms)
                print("Producer added to queue: ", current_time_ms)
                self._last_detection_time_ms = current_time_ms
            counter += 1


async def main():
    q = asyncio.Queue()
    producer = Producer(q)
    consumer = Consumer(q)
    producer_task = asyncio.create_task(producer.producer_loop())
    consumer_task = asyncio.create_task(consumer.consumer_loop())


if __name__ == "__main__":
    asyncio.run(main())

Solution

  • Code works for me if I use await asyncio.sleep() instead of time.sleep()

    In async taks don't run at the same time but it should switch task when it see await - and it seems it needs await asyncio.sleep() to have time to switch from producer to customer, and later to switch back from customer to producer.

    You have await in put() and get() but I can't explain why it doesn't switch tasks. Maybe it swithc but it switch too fast and it had not enough time to send data in queue.


    import asyncio
    import concurrent.futures
    
    class Consumer:
    
        def __init__(self, queue: asyncio.Queue):
            self._duration_before_restart_ms = 3000
            self._queue = queue
            self._last_triggered_time_ms = 0
    
        async def consumer_loop(self):
            print('start consumer')
    
            while True:
                print("Consumer new iteration.")
                detected_time_ms = await self._queue.get()
                print("Consumer new event: ", detected_time_ms)
                if (
                    detected_time_ms - self._duration_before_restart_ms
                    < self._last_triggered_time_ms
                ):
                    print("Consumer skipping event: ", detected_time_ms)
                    # Invalidate all items in queue that happened before
                    # _last_triggered_time_ms.
                    continue
                print("Consumer processing event: ", detected_time_ms)
                # Simulate authentication (an io bound operation) with sleep.
                await asyncio.sleep(5)
                self._last_triggered_time_ms = int(time.time() * 1000)
                print(
                    "Consumer processed event: ",
                    detected_time_ms,
                    " at: ",
                    self._last_triggered_time_ms,
                )
    
    
    class Producer:
    
        def __init__(self, queue: asyncio.Queue):
            self._detection_time_period_ms = 3000
            self._last_detection_time_ms = 0
            self._queue = queue
    
        async def producer_loop(self):
            print('start producer')
    
            counter = 0
            while True:
                # Iterates at 2fps
                await asyncio.sleep(0.5)
                print("Producer counter: ", counter)
                current_time_ms = int(time.time() * 1000)
    
                if (counter % 10 > 5) and (
                    self._last_detection_time_ms + self._detection_time_period_ms
                    < current_time_ms
                ):
                    print("Producer adding to queue: ", current_time_ms)
                    await self._queue.put(current_time_ms)
                    print("Producer added to queue: ", current_time_ms)
                    self._last_detection_time_ms = current_time_ms
                counter += 1
    
    
    async def main():
        q = asyncio.Queue()
        producer = Producer(q)
        consumer = Consumer(q)
        producer_task = asyncio.create_task(producer.producer_loop())
        consumer_task = asyncio.create_task(consumer.consumer_loop())
    
        # wait for end of task
        await asyncio.gather(producer_task)
        
    if __name__ == "__main__":
        asyncio.run(main())