Search code examples
pythonqueuepython-asyncio

Queue.asyncio Python get element without remove it


In python code , Is there a way to check the last élément in the asyncio queue without remove it? i have to check element before get it.

i try to like a simple queue but it doesnt work print(self.main_queue[-1]) or print(self.main_queue._queue[-1]) or self.main_queue.__dict__['queue'][-1] i put data with

self.main_queue.put_nowait(new_json) 

I receive data from an exchange , do data_processing and then send the data to client. I want to check the data before the sending thanks for help


Solution

  • There is no way using the existing implementation, without trying to access its private members.

    However, you can subclass it and add a method that will fetch the next element, and, on a subsequent get return that first, keeping the internal data as is.

    One have to take some care so that other methods like empty still work, but it is feasible.

    This should do:

    class InspectableQueue(asyncio.Queue):
        _sentinel = object()
        _next = _sentinel
        async def get(self):
            if self._next is self._sentinel:
                return await super().get()
            value = self._next
            self._next = self._sentinel
            return value
        def get_nowait(self):
            if self._next is self._sentinel:
                return super().get_nowait()
            value = self._next
            self._next = self._sentinel
            return value
        def peek(self, default=_sentinel):
            if self._next is not self._sentinel:
                return self._next
            try:
                self._next = self._sentinel
                value = self._next = super().get_nowait()
            except asyncio.QueueEmpty:
                if default is self._sentinel:
                    raise
                return default
            return value
        def empty(self):
            if self._next is not self._sentinel:
                return False
            return super().empty()
        def qsize(self):
            value = super.qsize()
            if self._next is not self._sentinel:
                value += 1
            return value
    

    (just use the .peek() method in the above class to check next value withoug poping it)

    Otherwise, if you don't mind depend on the current implementation of asyncio.Queue, knowing that it might change on any python version change (even minor updates), the internal data is kept in a ._queue attribute - which is a collections.deque instance- You can simple do:

    if not self.main_queue.empty():
        next_value = self.main_queue._queue[0]