Search code examples
pythonprocessmultiprocessingpython-multiprocessinglarge-data

Python multiprocessing queue makes code hang with large data


I'm using python's multiprocessing to analyse some large texts. After some days trying to figure out why my code was hanging (i.e. the processes didn't end), I was able to recreate the problem with the following simple code:

import multiprocessing as mp

for y in range(65500, 65600):
    print(y)

    def func(output):

         output.put("a"*y)

    if __name__ == "__main__":

        output = mp.Queue()

        process = mp.Process(target = func, args = (output,))

        process.start()

        process.join()

As you can see, if the item to put in the queue gets too large, the process just hangs. It doesn't freeze, if I write more code after output.put() it will run, but still, the process never stops.

This starts happening when the string gets to 65500 chars, depending on your interpreter it may vary.

I was aware that mp.Queue has a maxsize argument, but doing some search I found out it is about the Queue's size in number of items, not the size of the items themselves.

Is there a way around this? The data I need to put inside the Queue in my original code is very very large...


Solution

  • Your queue fills up with no consumer to empty it.

    From the definition of Queue.put:

    If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available.

    Assuming there is no deadlock possible between producer and consumer (and assuming your original code does have a consumer, since your sample doesn't), eventually the producers should be unlocked and terminate. Check the code of your consumer (or add it to the question, so we an have a look)


    Update

    This is not the problem, because queue has not been given a maxsize so put should succeed until you run out of memory.

    This is not the behavior of Queue. As elaborated in this ticket, the part blocking here is not the queue itself, but the underlying pipe. From the linked resource (inserts between "[]" are mine):

    A queue works like this: - when you call queue.put(data), the data is added to a deque, which can grow and shrink forever - then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls). - when you do queue.get(), you just do a read on the pipe/socket

    [..] when size [becomes too big] the writing thread blocks on the write syscall. And since a join is performed before dequeing the item [note: that's your process.join], you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full! If you dequeue the item before waiting the submitter process, everything works fine.


    Update 2

    I understand. But I don't actually have a consumer (if it is what I'm thinking it is), I will only get the results from the queue when process has finished putting it into the queue.

    Yeah, this is the problem. The multiprocessing.Queue is not a storage container. You should use it exclusively for passing data between "producers" (the processes that generate data that enters the queue) and "consumers (the processes that "use" that data). As you now know, leaving the data there is a bad idea.

    How can I get an item from the queue if I cannot even put it there first?

    put and get hide away the problem of putting together the data if it fills up the pipe, so you only need to set up a loop in your "main" process to get items out of the queue and, for example, append them to a list. The list is in the memory space of the main process and does not clog the pipe.