Search code examples
python-3.xamazon-web-servicespython-multiprocessingaws-batch

Multiprocessing.Pool: can not iterate over IMapIterator object in AWS Batch because of PicklingError


I need to request huge bulk of data from an API endpoint and I want to use multiprocessing (vs multithreading, company framework limitations)

I have a multiprocessing.Pool with predefined concurrency CONCURRENCY in a class called Batcher. The class looks like this:

class Batcher:
    def __init__(self, concurrency: int = 8):
        self.concurrency = concurrency

    def _interprete_response_to_succ_or_err(self, resp: requests.Response) -> str:
        if isinstance(resp, str):
            if "Error:" in resp:
                return "dlq"
            else:
                return "err"
        if isinstance(resp, requests.Response):
            if resp.status_code == 200:
                return "succ"
            else:
                return "err"

    def _fetch_dat_data(self, id: str) -> requests.Response:
        
        try:
            resp = requests.get(API_ENDPOINT)
            return resp
        except Exception as e:
            return f"ID {id} -> Error: {str(e)}"

    def _dispatch_batch(self, batch: list) -> dict:
        pool = MPool(self.concurrency)
        results = pool.imap(self._fetch_dat_data, batch)
        pool.close()
        pool.join()
        return results

    def _run_batch(self, id):
        return self._dispatch_batch(id)

    def start(self, id_list: list):
        """ In real class, this function will create smaller
        batches from bigger chunks of data """
        results = self._run_batch(id_list)
        print(
            [
                res.text
                for res in results
                if self._interprete_response_to_succ_or_err(res) == "succ"
            ]
        )

this class is called in file like this

if __name__ == "__main__":
    """
    the source of ids is a csv file with single column in s3 that contains list
    of columns with single id per line
    """
    id_list = boto3_get_object_body(my_file_name).decode().split("\n") # custom function, works
    batcher = Batcher()
    batcher.start(id_list)

This script is a part of AWS Batch Job that is triggered via CLI. the same function runs perfectly on my local machine with same environment as in AWS Batch. It throws

_pickle.PicklingError: Can't pickle <class 'boto3.resources.factory.s3.ServiceResource'>: attribute lookup s3.ServiceResource on boto3.resources.factory failed 

in the line where I try to iterate over IMapIterator object results that is generated by pool.imap()

Relevant Traceback:

for res in results
File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 870, in next
 raise value
File "/usr/local/lib/python3.9/multiprocessing/pool.py", line 537, in _handle_tasks
 put(task)
File "/usr/local/lib/python3.9/multiprocessing/connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'boto3.resources.factory.s3.ServiceResource'>: attribute lookup s3.ServiceResource on boto3.resources.factory failed

I am wondering if I am missing something blatantly obvious or this issue is related to EC2 Instance spun on by batch job at this point and appreciate any kind of lead to root cause analysis.


Solution

  • This error happens because multiprocessing could not import the relevant datatype for duplicating data or calling the target function in the new process it started. This usually happens when the object necessary for the target function to run is created someplace the child process do not know about (for example, a class created inside the if __name__ ==... block in main module), or if the object's __qualname__ property has been fiddled with (you might see this using something similar to functools.wraps or monkey-patching in general)

    Therefore, to actually "fix" this, you need to dig in your code and see if the above is true. A good place to start is with the class that is raising the issue (in this case it's boto3.resources.factory.s3.ServiceResource), can you import this in the main module before the if __name__... block runs?

    However, most of the times, you can get away with by simply reducing the data required to start the target function (less data = less chances for faults occuring). In this case, the target function you are calling in the pool is an instance method. To start this function in a new process, multiprocessing would need to pickle all the instance attributes, which might have their own instance attributes, and so on. Not only does this add overhead, it could also be possible that the problem lies in a particular instance attribute. Therefore, just as a good practice, if your target function can run independently but is currently an instance method, change it a to staticmethod instead.

    In this case, this would mean changing _fetch_dat_data to a staticmethod, and submitting it to the pool using type(self)._fetch_dat_data instead.