Search code examples
pythonmultithreadingqueuepython-multithreadingblockingqueue

Cannot to line up item from one queue to another


I deal with two python queues.
Short description of my issue:
Clients pass through the waiting queue(q1) and they (the clients) are served afterwards.
The size of the waiting queue can't be greater than N (10 in my program).
If waiting queue becomes full, clients pass to outside queue(q2, size 20). If outside queue becomes full, clients are rejected and not served.
Every client that left a waiting queue allows another client from outside queue to join the waiting queue.

Work with queues should be thread-safe.

Below I implemented approximately what I want. But I'm faced with the problem - enqueuing a client from outside queue (q1) to the waiting queue (q2) during execution serve function. I guess I lost or forgot something important. I think this statement q1.put(client) blocks permanently but don't know why.

import time
import threading
from random import randrange
from Queue import Queue, Full as FullQueue


class Client(object):
    def __repr__(self):
        return '<{0}: {1}>'.format(self.__class__.__name__, id(self))


def serve(q1, q2):
    while True:
        if not q2.empty():
            client = q2.get()
            print '%s leaved outside queue' % client
            q1.put(client)
            print '%s is in the waiting queue' % client
            q2.task_done()

        client = q1.get()
        print '%s leaved waiting queue for serving' % client
        time.sleep(2)  # Do something with client
        q1.task_done()


def main():
    waiting_queue = Queue(10)
    outside_queue = Queue(20)

    for _ in range(2):
        worker = threading.Thread(target=serve, args=(waiting_queue, outside_queue))
        worker.setDaemon(True)
        worker.start()

    delays = [randrange(1, 5) for _ in range(100)]

    # Every d seconds 10 clients enter to the waiting queue
    for d in delays:
        time.sleep(d)
        for _ in range(10):
            client = Client()
            try:
                waiting_queue.put_nowait(client)
            except FullQueue:
                print 'Waiting queue is full. Please line up in outside queue.'
                try:
                    outside_queue.put_nowait(client)
                except FullQueue:
                    print 'Outside queue is full. Please go out.'

    waiting_queue.join()
    outside_queue.join()
    print 'Done'

Solution

  • Finally I found the solution. I check docs more attentive If full() returns True it doesn’t guarantee that a subsequent call to get() will not block https://docs.python.org/2/library/queue.html#Queue.Queue.full

    That's why q1.full() is not reliable in a few threads. I added mutex before inserting item to queues and checking queue is full:

    class Client(object):
        def __init__(self, ident):
            self.ident = ident
    
        def __repr__(self):
            return '<{0}: {1}>'.format(self.__class__.__name__, self.ident)
    
    
    def serve(q1, q2, mutex):
        while True:
            client = q1.get()
            print '%s leaved waiting queue for serving' % client
            time.sleep(2)  # Do something with client
            q1.task_done()
    
            with mutex:
                if not q2.empty() and not q1.full():
                    client = q2.get()
                    print '%s leaved outside queue' % client
                    q1.put(client)
                    print '%s is in the waiting queue' % client
                    q2.task_done()
    
    
    def main():
        waiting_queue = Queue(10)
        outside_queue = Queue(20)
    
        lock = threading.RLock()
    
        for _ in range(2):
            worker = threading.Thread(target=serve, args=(waiting_queue, outside_queue, lock))
            worker.setDaemon(True)
            worker.start()
    
        # Every 1-5 seconds 10 clients enter to the waiting room
        i = 1  # Used for unique <int> client's id
        while True:
            delay = randrange(1, 5)
            time.sleep(delay)
            for _ in range(10):
                client = Client(i)
                try:
                    lock.acquire()
                    if not waiting_queue.full():
                        waiting_queue.put(client)
                    else:
                        outside_queue.put_nowait(client)
                except FullQueue:
                    # print 'Outside queue is full. Please go out.'
                    pass
                finally:
                    lock.release()
    
                i += 1
    
        waiting_queue.join()
        outside_queue.join()
        print 'Done'
    

    Now it works well.