Search code examples
pythonmacospython-2.7python-multiprocessingparallel-python

How can I prevent parallel python from quitting with "OSError: [Errno 35] Resource temporarily unavailable"?


Context

I'm trying to run 1000 simulations that involve (1) damaging a road network and then (2) measuring the traffic delays due to the damage. Both steps (1) and (2) involve creating multiple "maps". In step (1), I create 30 damage maps. In step (2), I measure the traffic delay for each of those 30 damage maps. The function then returns the average traffic delay over the 30 damage maps, and proceeds to run the next simulation. The pseudocode for the setup looks like this:

for i in range(0,1000): # for each simulation
    create 30 damage maps using parallel python
    measure the traffic delay of each damage map using parallel 
    python
    compute the average traffic delay for simulation i

Since the maps are independent of each other, I've been using the parallel python package at each step.

Problem -- Error Message

The code has twice thrown the following error around the 72nd simulation (of 1000) and stopped running during step (1), which involves damaging the bridges.

An error has occurred during the function execution
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/ppworker.py", line 90, in run
__result = __f(*__args)
File "<string>", line 4, in compute_damage
File "<string>", line 3, in damage_bridges
File "/Library/Python/2.7/site-packages/scipy/stats/__init__.py", line 345, in <module>
from .stats import *
File "/Library/Python/2.7/site-packages/scipy/stats/stats.py", line 171, in <module>
from . import distributions
File "/Library/Python/2.7/site-packages/scipy/stats/distributions.py", line 10, in <module>
from ._distn_infrastructure import (entropy, rv_discrete, rv_continuous,
File "/Library/Python/2.7/site-packages/scipy/stats/_distn_infrastructure.py", line 16, in <module>
from scipy.misc import doccer
File "/Library/Python/2.7/site-packages/scipy/misc/__init__.py", line 68, in <module>
from scipy.interpolate._pade import pade as _pade
File "/Library/Python/2.7/site-packages/scipy/interpolate/__init__.py", line 175, in <module>
from .interpolate import *
File "/Library/Python/2.7/site-packages/scipy/interpolate/interpolate.py", line 32, in <module>
from .interpnd import _ndim_coords_from_arrays
File "interpnd.pyx", line 1, in init scipy.interpolate.interpnd
File "/Library/Python/2.7/site-packages/scipy/spatial/__init__.py", line 95, in <module>
from .ckdtree import *
File "ckdtree.pyx", line 31, in init scipy.spatial.ckdtree
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/__init__.py", line 123, in cpu_count
with os.popen(comm) as p:
OSError: [Errno 35] Resource temporarily unavailable

Systems and versions

I'm running Python 2.7 in a PyCharm virtual environment with parallel python (pp) 1.6.5. My computer runs Mac OS High Sierra 10.13.3 with memory of 8 GB 1867 MHz DDR3.

Attempted fixes

I gather that the problem is with the parallel python package or how I've used it, but am otherwise at a loss to understand how to fix this. It's been noted as a bug on the parallel python page -- wkerzendorf posted there:

Q: I get a Socket Error/Memory Error when using jobs that use >os.system calls

A: The fix I found is using subprocess.Popen and poping the >stdout,stderr into the subprocess.PIPE. here is an example: subprocess.Popen(['ls ->rtl'],stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True). That >fixed the error for me.

However, I wasn't at all sure where to make this modification.

I've also read that the problem may be with my system limits, per this Ghost in the Machines blog post. However, when I tried to reconfigure the max number of files and max user processes, I got the following message in terminal:

Could not set resource limits: 1: Operation not permitted

Code using parallel python

The code I'm working with is rather complicated (requires multiple input files to run) so I'm afraid I can't provide a minimal, reproducible example here. You can download and run a version of the code at this link.

Below I've included the code for step (1), in which I use parallel python to create the 30 damage maps. The number of workers is 4.

ppservers = ()    #starting a super cool parallelization
# Creates jobserver with automatically detected number of workers
job_server = pp.Server(ppservers=ppservers)
print "Starting pp with", job_server.get_ncpus(), "workers"

# set up jobs
jobs = []
for i in targets:
    jobs.append(job_server.submit(compute_damage, (lnsas[i%len(lnsas)], napa_dict, targets[i], i%sets, U[i%sets][:] ), modules = ('random', 'math', ), depfuncs = (damage_bridges, )))

# get the results that have already run
bridge_array_new = []
bridge_array_internal = []
indices_array = []
bridge_array_hwy_num = []
for job in jobs:
    (index, damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges_road) = job()
    bridge_array_internal.append(damaged_bridges_internal)
    bridge_array_new.append(damaged_bridges_new)
    indices_array.append(index)
    bridge_array_hwy_num.append(num_damaged_bridges_road)

Additional functions

The compute_damage function looks like this.

def compute_damage(scenario, master_dict, index, scenario_index, U):
'''goes from ground-motion intensity map to damage map '''
#figure out component damage for each ground-motion intensity map
damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges = damage_bridges(scenario, master_dict, scenario_index, U) #e.g., [1, 89, 598] #num_bridges_out is highway bridges only
return index, damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges

The damage_bridges function looks like this.

def damage_bridges(scenario, master_dict, scenario_index, u):
'''This function damages bridges based on the ground shaking values (demand) and the structural capacity (capacity). It returns two lists (could be empty) with damaged bridges (same thing, just different bridge numbering'''
from scipy.stats import norm
damaged_bridges_new = []
damaged_bridges_internal = []

#first, highway bridges and overpasses
beta = 0.6 #you may want to change this by removing this line and making it a dictionary lookup value 3 lines below
i = 0 # counter for bridge index
for site in master_dict.keys(): #1-1889 in Matlab indices (start at 1)
    lnSa = scenario[master_dict[site]['new_id'] - 1]
    prob_at_least_ext = norm.cdf((1/float(beta)) * (lnSa - math.log(master_dict[site]['ext_lnSa'])), 0, 1) #want to do moderate damage state instead of extensive damage state as we did here, then just change the key name here (see master_dict description)
    #U = random.uniform(0, 1)
    if u[i] <= prob_at_least_ext:
        damaged_bridges_new.append(master_dict[site]['new_id']) #1-1743
        damaged_bridges_internal.append(site) #1-1889
    i += 1 # increment bridge index

# GB ADDITION -- to use with master_dict = napa_dict, since napa_dict only has 40 bridges
num_damaged_bridges = sum([1 for i in damaged_bridges_new if i <= 1743])

return damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges

Solution

  • It seems like the issue was that I had neglected to destroy the servers created in steps (1) and (2) -- a simple fix! I simply added job_server.destroy() at the end of each step. I'm currently running the simulations and have reached 250 of the 1000 without incident.

    To be completely clear, the code for step (1) is now:

    ppservers = ()    #starting a super cool parallelization
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)
    
    # set up jobs
    jobs = []
    for i in targets:
        jobs.append(job_server.submit(compute_damage, (lnsas[i%len(lnsas)], napa_dict, targets[i], i%sets, U[i%sets][:] ), modules = ('random', 'math', ), depfuncs = (damage_bridges, )))
    
    # get the results that have already run
    bridge_array_new = []
    bridge_array_internal = []
    indices_array = []
    bridge_array_hwy_num = []
    for job in jobs:
        (index, damaged_bridges_internal, damaged_bridges_new, num_damaged_bridges_road) = job()
        bridge_array_internal.append(damaged_bridges_internal)
        bridge_array_new.append(damaged_bridges_new)
        indices_array.append(index)
        bridge_array_hwy_num.append(num_damaged_bridges_road)
    
    job_server.destroy()