Search code examples
pythonmultithreadingtimesleep

Skip loop iteration if it exceeds some time limit


I have this loop:

for index, row in df.iterrows():
   process_row(index, row)

where process_row is a method that calls two time an API.

def process_row(index, row):
    print("Evaluating row index:", index)
    question = row["Question"]
    answer = row["Answer"]
    instruct = "..."
    instruct2 = "..."

    try:
        completion = openai.ChatCompletion.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": instruct}]
        )    
        response = completion["choices"][0]["message"]["content"]

        completion = openai.ChatCompletion.create(
            model="gpt-3.5-turbo", messages=[{"role": "user", "content": instruct2}]
        )
        response2 = completion["choices"][0]["message"]["content"]

        .... OTHER CODE ....
    except Exception as e:
        print(e)

I want that if the whole method takes more than 30 seconds for an iteration, it performs this:

min_vote = 10
row_with_vote = row.tolist() + [min_vote]
passed_writer.writerow(row_with_vote)

How can I do so? I tried something with concurrent.futures but I don't see any improvement, but if you want I can add it to the post. I have seen other posts but they make a check after every instruction, while I'm pretty sure that in my case it wouldn't solve as the program gets stuck at a single line. Moreover, what reasons can make the method this slow? Most of the iteration take just a couple of seconds, while sometimes one takes 10 or more minutes so something goes wrong.


Solution

  • Pulling from this answer, try using the signal package to define a timeout.

    import signal
    
    def signal_handler(signum, frame):
        raise Exception("timeout function")
        
    def long_function_call():
        while True:
            pass
    
    signal.signal(signal.SIGALRM, signal_handler)
    signal.alarm(3)   # Three seconds
    try:
        long_function_call()
    except:
        print("Timed out!")
    

    So your code could look something like this:

    import signal
    import time
    import pandas as pd
    import csv
    
    #dummy function
    def process_row(index, row):
        time.sleep(index)
        print(f"Processed index {index}")
        
    # dummy data
    df = pd.DataFrame(columns=["a"], index=range(10))
        
    def signal_handler(signum, frame):
        raise Exception("timeout function")
        
    with open("./tmpcsv.csv", "w") as f:
        writer = csv.writer(f)
        for index, row in df.iterrows():
            signal.signal(signal.SIGALRM, signal_handler)
            signal.alarm(5)   # 5 second timeout
            try:
                process_row(index, row)
            except:
                print("Timed out!")
                writer.writerow(row)
    
    Processed index 0
    Processed index 1
    Processed index 2
    Processed index 3
    Processed index 4
    Timed out!
    Timed out!
    Timed out!
    Timed out!
    Timed out!