Search code examples
pythonpython-3.xqueuepython-multithreading

What is the safest way to queue multiple threads originating in a loop?


My script loops through each line of an input file and performs some actions using the string in each line. Since the tasks performed on each line are independent of each other, I decided to separate the task into threads so that the script doesn't have to wait for the task to complete to continue with the loop. The code is given below.


def myFunction(line, param):
    # Doing something with line and param
    # Sends multiple HTTP requests and parse the response and produce outputs
    # Returns nothing

param = arg[1]   
with open(targets, "r") as listfile:
    for line in listfile:
        print("Starting a thread for: ",line)
        t=threading.Thread(target=myFunction, args=(line, param,)) 
        threads.append(t)
        t.start()

I realized that this is a bad idea as the number of lines in the input file grew large. With this code, there would be as many threads as the number of lines. Researched a bit and figured that queues would be the way.

I want to understand the optimal way of using queues for this scenario and if there are any alternatives which I can use.


Solution

  • Queues are one way to do it. The way to use them is to put function parameters on a queue, and use threads to get them and do the processing.

    The queue size doesn't matter too much in this case because reading the next line is fast. In another case, a more optimized solution would be to set the queue size to at least twice the number of threads. That way if all threads finish processing an item from the queue at the same time, they will all have the next item in the queue ready to be processed.

    To avoid complicating the code threads can be set as daemonic so that they don't stop the program from finishing after the processing is done. They will be terminated when the main process finishes.

    The alternative is to put a special item on the queue (like None) for each thread and make the threads exit after getting it from the queue and then join the threads.

    For the examples bellow the number of worker threads is set using the workers variable.

    Here is an example of a solution using a queue.

    from queue import Queue
    from threading import Thread
    
    queue = Queue(workers * 2)
    def work():
        while True:
            myFunction(*queue.get())
            queue.task_done()
    
    for _ in range(workers):
        Thread(target=work, daemon=True).start()
    
    with open(targets, 'r') as listfile:
        for line in listfile:
            queue.put((line, param))
    queue.join()
    

    A simpler solution might be using ThreadPoolExecutor. It is especially simple in this case because the function being called doesn't return anything that needs to be used in the main thread.

    from concurrent.futures import ThreadPoolExecutor
    
    with ThreadPoolExecutor(max_workers=workers) as executor:
        with open(targets, 'r') as listfile:
            for line in listfile:
                executor.submit(myFunction, line, param)
    

    Also, if it's not a problem to have all lines stored in memory, there is a solution which doesn't use anything other than threads. The work is split in such a way that the threads read some lines from a list and ignore other lines. A simple example with two threads is where one thread reads odd lines and the other reads even lines.

    from threading import Thread
    
    with open(targets, 'r') as listfile:
        lines = listfile.readlines()
    
    def work_split(n):
        for line in lines[n::workers]:
            myFunction(line, param)
    
    threads = []
    for n in range(workers):
        t = Thread(target=work_split, args=(n,))
        t.start()
        threads.append(t)
    
    for t in threads:
        t.join()
    

    I have done a quick benchmark and the Queue is slightly faster than the ThreadPoolExecutor, but the solution with the split work is faster than both.