Search code examples
pythonmultithreadingqueuepython-multithreading

Python script with threading won't finish after queue is empty?


This is my first time using threading so I apologize, I have little idea what I'm doing.

I had a very long running script that had to make ~54,000 API calls, pare down results and write them to a file. I'm trying to speed it up by using threading since this is an I/O bound program.

I read through this article https://towardsdatascience.com/multithreading-multiprocessing-python-180d0975ab29 to try to understand threading and multiprocessing. Then I tried to retrofit their threading example that you can find about halfway down that page.

The following code works. Using a sample list of only 20 API calls, I got the same results into my outfile as I would get before I implemented threading (albeit in a different order of course).

There's only one problem: the script never ends. I can see that its processed everything the queue, but it keeps running and doesn't ever output my final print statement to tell me how long it took.

Can someone tell me what I missed here? Thanks so much~

import time, datetime, csv, requests, urllib3, socket, os
from queue import Queue
from threading import Thread
import pandas as pd

q = Queue()
NUM_THREADS = 10

### skipping some other code up here that's not relevant to the question (i hope) ###

def get_name_list(infile, header):
    df = pd.read_excel(infile, engine='openpyxl')
    list = df[header].tolist()
    return list

def min_query_flattened_ci_rel(name, header='Server Name'):
    ### Working code in this function, i've redacted it because it's not relevant to the question 
    ### and I'd have to retype a lot (can't copy out of my work VDI onto my personal desktop, where I'm writing this post)
    ### This function will make an API call to query something with the provided name
    ### then it pares down the result to a list of dictionaries that has the information i need
    ### returns a list of dictionaries

def get_sam(outfile, header='Server Name'):
    global q

    while not q.empty():
        print("Queue size: ", q.qsize())
        sam_results = min_query_flattened_ci_rel(q.get(), header=header)
        with open(outfile, 'a') as f:
            writer = csv.writer(f)
            for dict in sam_results:
                writer.writerow(dict.values())


if __name__ == '__main__':
    start_time = time.time()
    print('{} - Initializing script.'.format(datetime.datetime.now().strftime('%H:%M:%S %m/%d/%Y')))

    sample_name_list = get_name_list('samplefile.xlsx', 'Host Name')

    final_csv_name = 'outputsample.csv'

    with open(final_csv_name, 'a') as f:
        writer = csv.writer(f)
        writer.writerow(['Host Name', 'u_name', 'u_child', 'u_parent', 'u_parent_application', 'u_archer_id'])

    for name in sample_name_list:
        q.put(name)

    for thread in range(NUM_THREADS):
        worker = Thread(target=get_sam, args=(final_csv_name, 'Host Name'))
        worker.daemon = True
        worker.start()

    q.join()

    print('{0} - Completed full script in {1} seconds.'.format((datetime.datetime.now().strftime('%H:%M:%S %m/%d/%Y')), (time.time() - start_time)))

Solution

  • From user2357112's reply to use Queue.task_done().Tacked it onto the end of my while loop to fix the issue.

    while not q.empty():
        print("Queue size: ", q.qsize())
        sam_results = min_query_flattened_ci_rel(q.get(), header=header)
        with open(outfile, 'a') as f:
            writer = csv.writer(f)
            for dict in sam_results:
                    writer.writerow(dict.values())
        q.task_done()