Search code examples
pythonboto3python-multiprocessingstarmap

How to use boto3 client with Python multiprocessing?


Code looks something like this:

import multiprocessing as mp
from functools import partial

import boto3
import numpy as np


s3 = boto3.client('s3')

def _something(**kwargs):
    # Some mixed integer programming stuff related to the variable archive
    return np.array(some_variable_related_to_archive)


def do(s3):
    archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
    pool = mp.pool()
    sub_process = partial(_something, slack=0.1)
    parts = np.array_split(archive, some_int)
    target_parts = np.array(things)

    out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line

    pool.close()
    pool.join()

do(s3)

The error:

_pickle.PicklingError: Can't pickle <class 'botocore.client.S3'>: attribute lookup S3 on botocore.client failed

I have very limited experience with the Python multiprocessing library. I'm not sure why, when the S3 client is not a parameter in any of the functions, it throws the above error. Note that the code is able to run okay if the archive file is loaded from disk, and not from S3.

Any help/guidance would be greatly appreciated.


Solution

  • Objects passed to mp.starmap() must be pickle-able, and S3 clients are not pickle-able. Bringing the actions of the S3 client outside of the function that calls mp.starmap() can solve the issue:

    import multiprocessing as mp
    from functools import partial
    
    import boto3
    import numpy as np
    
    
    s3 = boto3.client('s3')
    archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant # Move the s3 call here, outside of the do() function
    
    def _something(**kwargs):
        # Some mixed integer programming stuff related to the variable archive
        return np.array(some_variable_related_to_archive)
    
    
    def do(archive): # pass the previously loaded archive, and not the s3 object into the function
        pool = mp.pool()
        sub_process = partial(_something, slack=0.1)
        parts = np.array_split(archive, some_int)
        target_parts = np.array(things)
    
        out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
    
        pool.close()
        pool.join()
    
    do(archive) # pass the previously loaded archive, and not the s3 object into the function