I have a function called using python's multiprocess
library (the mulitprocessing
library doesn't work correctly in Jupyter Notebook). The first iteration of the code works perfectly:
usa_places_folder_id = 'hidden'
def download_cities_geopackage_test(usa_places_folder_id=usa_places_folder_id):
# Make filepath of gpkg to download
city_geopackage_filepath = os.path.join(
'geo_files', 'city_boundaries', 'usa_places_2019.gpkg')
# Get all of the items inside the drive folder
folder_objects = drive.ListFile(
{'q': f"'{usa_places_folder_id}' in parents and trashed=false"}).GetList()
# If the city package is found in the folder, download it
for folder_object in folder_objects:
if folder_object['title'] == 'usa_places_2019.gpkg':
folder_object.GetContentFile(city_geopackage_filepath)
print("Downloaded usa_places_2019.gpkg")
# Set variable to loop if download times out
timeout = True
while timeout:
print("Working on downloading usa_places_2019.gpkg...")
# Create Process with function defined above
p1 = Process(target=download_cities_geopackage_test,
name='Process_download_cities_geopackage')
# Start Process with a time limit
p1.start()
p1.join(timeout=10)
p1.terminate()
if p1.exitcode is None:
# Let the while-loop restart
print(f'{p1} timed out, restarting.\n')
else:
print("usa_places_2019.gpkg has been downloaded!")
timeout = False
Output:
Working on downloading usa_places_2019.gpkg...
Downloaded usa_places_2019.gpkg
usa_places_2019.gpkg has been downloaded!
But because I use the multiprocess
code many times in the entire Jupyter Notebook, I wanted to wrap it in a function to call. The code below is the wrapper function:
# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
function_to_wrap,
num_elements_to_return=1,
timeout=60,
use_queue=False
):
"""
Use this function to restart another passed
function if it hangs for more than the allotted
timeout period.
Parameters:
function_to_wrap (function): The name of the
function you are wrapping with this
timeout function.
num_elements_to_return (int): The number of
elements you'll be returning from the queue.
timeout (int): How many seconds you'd like
this function to wait before timing out.
use_queue (Boolean): True/False on whether
there will be items put into a queue
for retrieval later. Only use this if
your function_to_wrap is trying to return
something.
Returns:
result_tuple (tuple): A tuple containing all
elements that you'd like your function_to_wrap
to return.
"""
# Set variable to loop if download times out
timeout = True
while timeout:
print("Working on fetching data...")
# Instantiate Queue and start process if you are
# using a queue.
if use_queue:
queue = Queue()
# Create Process with function defined above,
# with a queue
p1 = Process(target=function_to_wrap,
name=f'Process_{function_to_wrap}',
args=(queue,))
else:
# Create Process with function defined above,
# without a queue
p1 = Process(target=function_to_wrap,
name=f'Process_{function_to_wrap}')
# Start process with a time limit
p1.start()
p1.join(timeout=timeout)
p1.terminate()
if p1.exitcode is None:
# Let the while-loop restart
print(f'{p1} timed out, restarting.\n')
else:
print("Data has been fetched!")
# Start empty list that will eventually
# be returned as the tuple with data
result_list = []
# If there is a queue, get the items from it
if use_queue:
for i in range(num_elements_to_return):
result_list.append(queue.get())
# Turn result_list into a tuple
result_tuple = tuple(result_list)
# End the while loop
timeout = False
return result_tuple
And finally, below is the code that uses the wrapper function. However, it times out instantly:
usa_places_folder_id = 'hidden'
def download_cities_geopackage_test(usa_places_folder_id=usa_places_folder_id):
# Make filepath of gpkg to download
city_geopackage_filepath = os.path.join(
'geo_files', 'city_boundaries', 'usa_places_2019.gpkg')
# Get all of the items inside the drive folder
folder_objects = drive.ListFile(
{'q': f"'{usa_places_folder_id}' in parents and trashed=false"}).GetList()
# If the city package is found in the folder, download it
for folder_object in folder_objects:
if folder_object['title'] == 'usa_places_2019.gpkg':
folder_object.GetContentFile(city_geopackage_filepath)
print("Downloaded usa_places_2019.gpkg")
timeout_wrapper_function(
function_to_wrap=download_cities_geopackage_test,
num_elements_to_return=0,
timeout=60,
use_queue=False
)
Output:
Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93723 parent=9165 started> timed out, restarting.
Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93724 parent=9165 started> timed out, restarting.
Working on fetching data...
<Process name='Process_<function download_cities_geopackage_test at 0x5407bfb80>' pid=93727 parent=9165 started> timed out, restarting.
Working on fetching data...
The multiprocess code repeats itself if the p1.exitcode
is None
. Why does this keep happening in the 2nd example when put through a wrapped function, but works perfectly when not?
User (my) error. My wrapper function was using "timeout" as an argument for number of seconds to wait until timing out:
# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
function_to_wrap,
num_elements_to_return=1,
timeout=60,
use_queue=False
):
And then almost immediately in the function, timeout
is set to True
.
So to fix this, all I needed was to update the function parameter to timeout_seconds
:
# Wrapper function to restart pydrive get-requests
def timeout_wrapper_function(
function_to_wrap,
num_elements_to_return=1,
timeout_seconds=60,
use_queue=False
):
And adjust some of the code in the function that uses that parameter.