Search code examples
pythonmultiprocessingpython-multiprocessingmultiprocess

Python Multiprocess keeps "timing out" instantly, but only when function is wrapped


I have a function called using python's multiprocess library (the mulitprocessing library doesn't work correctly in Jupyter Notebook). The first iteration of the code works perfectly:

usa_places_folder_id = 'hidden'

def download_cities_geopackage_test(usa_places_folder_id=usa_places_folder_id):
    
    # Make filepath of gpkg to download
    city_geopackage_filepath = os.path.join(
        'geo_files', 'city_boundaries', 'usa_places_2019.gpkg')

    # Get all of the items inside the drive folder
    folder_objects = drive.ListFile(
        {'q': f"'{usa_places_folder_id}' in parents and trashed=false"}).GetList()

    # If the city package is found in the folder, download it
    for folder_object in folder_objects:
        if folder_object['title'] == 'usa_places_2019.gpkg':
            folder_object.GetContentFile(city_geopackage_filepath)
            print("Downloaded usa_places_2019.gpkg")
            

# Set variable to loop if download times out
timeout = True

while timeout:
    print("Working on downloading usa_places_2019.gpkg...")

    # Create Process with function defined above
    p1 = Process(target=download_cities_geopackage_test,
                 name='Process_download_cities_geopackage')

    # Start Process with a time limit
    p1.start()
    p1.join(timeout=10)
    p1.terminate()

    if p1.exitcode is None:
        # Let the while-loop restart
        print(f'{p1} timed out, restarting.\n')
    else:
        print("usa_places_2019.gpkg has been downloaded!")
        timeout = False

Output:

Working on downloading usa_places_2019.gpkg...
Downloaded usa_places_2019.gpkg
usa_places_2019.gpkg has been downloaded!

But because I use the multiprocess code many times in the entire Jupyter Notebook, I wanted to wrap it in a function to call. The code below is the wrapper function:

# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
    function_to_wrap,
    num_elements_to_return=1,
    timeout=60,
    use_queue=False
):
    """
    Use this function to restart another passed
    function if it hangs for more than the allotted
    timeout period.
    
    Parameters:
        function_to_wrap (function): The name of the
            function you are wrapping with this
            timeout function.
        num_elements_to_return (int): The number of
            elements you'll be returning from the queue.
        timeout (int): How many seconds you'd like
            this function to wait before timing out.
        use_queue (Boolean): True/False on whether
            there will be items put into a queue
            for retrieval later. Only use this if
            your function_to_wrap is trying to return
            something.
    
    Returns:
        result_tuple (tuple): A tuple containing all
            elements that you'd like your function_to_wrap
            to return.
    
    """
    # Set variable to loop if download times out
    timeout = True

    while timeout:
        print("Working on fetching data...")

        # Instantiate Queue and start process if you are
        # using a queue.
        if use_queue:
            queue = Queue()

            # Create Process with function defined above,
            # with a queue
            p1 = Process(target=function_to_wrap, 
                         name=f'Process_{function_to_wrap}',
                         args=(queue,))
            
        else:
            # Create Process with function defined above, 
            # without a queue
            p1 = Process(target=function_to_wrap, 
                         name=f'Process_{function_to_wrap}')

        # Start process with a time limit
        p1.start()
        p1.join(timeout=timeout)
        p1.terminate()

        if p1.exitcode is None:
            # Let the while-loop restart
            print(f'{p1} timed out, restarting.\n')
        else:
            print("Data has been fetched!")

            # Start empty list that will eventually
            # be returned as the tuple with data
            result_list = []
            
            # If there is a queue, get the items from it
            if use_queue:
                for i in range(num_elements_to_return):
                    result_list.append(queue.get())
                
            # Turn result_list into a tuple
            result_tuple = tuple(result_list)

            # End the while loop
            timeout = False
    
    return result_tuple

And finally, below is the code that uses the wrapper function. However, it times out instantly:

usa_places_folder_id = 'hidden'

def download_cities_geopackage_test(usa_places_folder_id=usa_places_folder_id):
    
    # Make filepath of gpkg to download
    city_geopackage_filepath = os.path.join(
        'geo_files', 'city_boundaries', 'usa_places_2019.gpkg')

    # Get all of the items inside the drive folder
    folder_objects = drive.ListFile(
        {'q': f"'{usa_places_folder_id}' in parents and trashed=false"}).GetList()

    # If the city package is found in the folder, download it
    for folder_object in folder_objects:
        if folder_object['title'] == 'usa_places_2019.gpkg':
            folder_object.GetContentFile(city_geopackage_filepath)
            print("Downloaded usa_places_2019.gpkg")
            

timeout_wrapper_function(
    function_to_wrap=download_cities_geopackage_test,
    num_elements_to_return=0,
    timeout=60,
    use_queue=False
)

Output:

Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93723 parent=9165 started> timed out, restarting.

Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93724 parent=9165 started> timed out, restarting.

Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93727 parent=9165 started> timed out, restarting.

Working on fetching data...

The multiprocess code repeats itself if the p1.exitcode is None. Why does this keep happening in the 2nd example when put through a wrapped function, but works perfectly when not?


Solution

  • User (my) error. My wrapper function was using "timeout" as an argument for number of seconds to wait until timing out:

    # Wrapper function to restart pydrive get-requests
    def timeout_wrapper_function(
        function_to_wrap,
        num_elements_to_return=1,
        timeout=60,
        use_queue=False
    ):
    

    And then almost immediately in the function, timeout is set to True.

    So to fix this, all I needed was to update the function parameter to timeout_seconds:

    # Wrapper function to restart pydrive get-requests
    def timeout_wrapper_function(
        function_to_wrap,
        num_elements_to_return=1,
        timeout_seconds=60,
        use_queue=False
    ):
    

    And adjust some of the code in the function that uses that parameter.