Search code examples
pythonflaskmultiprocessingjoblib

Error in parallelizing for loop inside flask app (TypeError: can't pickle function objects)


I have a flask app and I need to parallelize a for loop inside it. I have been using the method described in https://blog.dominodatalab.com/simple-parallelization/ till now. But when I use this method inside flask app it is failing. Below is a sample code snippet.

I start the app by running below file.

#!flask/bin/python
from app import app
app.run(host='0.0.0.0',port=5010)

app/init.py is as below

from flask import Flask

app = Flask(__name__)
app.config.from_object('config')
from app import views

And I'm calling the asynchronous part from on of the router functions in views.py

@app.route('/kNearest')
def k_nearest():
    print "in func call"
    requested_author_id = session['requested_author_id']
    complete_ts = ast.literal_eval(session['complete_ts'])
    author_tags = ast.literal_eval(session['prof_labels'])
    author_requested = session['author_requested']
    a = AuthorForecaster()
    k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
    return render_template("nearest.html",
                               title="Closest neighbours",
                               author_name=author_requested,
                               neighbours=k_nearest
                               )

get_nearest_k() has the asynchronous code.

from joblib import Parallel, delayed
import multiprocessing

# what are your inputs, and what operation do you want to 
# perform on each input. For example...
inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)

Below is the error I'm getting

Traceback (most recent call last):

File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1836, in __call__
    return self.wsgi_app(environ, start_response)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1820, in wsgi_app
    response = self.make_response(self.handle_exception(e))
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1403, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app
    response = self.full_dispatch_request()
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request
    rv = self.dispatch_request()
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/Users/arun/citations_project/citationswebsite/app/views.py", line 71, in k_nearest
    k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
  File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in get_nearest_k
    distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 758, in __call__
    while self.dispatch_one_batch(iterator):
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 603, in dispatch_one_batch
    tasks = BatchedCalls(itertools.islice(iterator, batch_size))
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 127, in __init__
    self.items = list(iterator_slice)
  File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in <genexpr>
    distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 183, in delayed
    pickle.dumps(function)
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle function objects

Solution

  • This is due to poor serialization of functions in multiprocessing library. Alternative is to use pathos library.

    import pathos.multiprocessing as mp
    p = mp.Pool(4)  # Processing Pool with four processors
    p.map(lambda x: x**2, range(10))
    

    Reference : http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization