Search code examples
pythonpython-3.xmultithreadingmultiprocessingpathos

Instance attributes do not persist using multiprocessing


I'm having an issue with instances not retaining changes to attributes, or even keeping new attributes that are created. I think I've narrowed it down to the fact that my script takes advantage of multiprocessing, and I'm thinking that changes occurring to instances in separate process threads are not 'remembered' when the script returns to the main thread.

Basically, I have several sets of data which I need to process in parallel. The data is stored as an attribute, and is altered via several methods in the class. At the conclusion of processing, I'm hoping to return to the main thread and concatenate the data from each of the object instances. However, as described above, when I try to access the instance attribute with the data after the parallel processing bit is done, there's nothing there. It's as if any changes enacted during the multiprocessing bit are 'forgotten'.

Is there an obvious solution to fix this? Or do I need to rebuild my code to instead return the processed data rather than just altering/storing it as an instance attribute? I guess an alternative solution would be to serialize the data, and then re-read it in when necessary, rather than just keeping it in memory.

Something maybe worth noting here is that I am using the pathos module rather than python's multiprocessingmodule. I was getting some errors pertaining to pickling, similar to here: Python multiprocessing PicklingError: Can't pickle <type 'function'>. My code is broken across several modules and as mentioned, the data processing methods are contained within a class.

Sorry for the wall of text.

EDIT Here's my code:

import importlib
import pandas as pd
from pathos.helpers import mp
from provider import Provider

# list of data providers ... length is arbitrary
operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']


# create provider objects for each operating provider
provider_obj_list = []
for name in operating_providers:
    loc     = 'providers.%s' % name
    module  = importlib.import_module(loc)
    provider_obj = Provider(module)
    provider_obj_list.append(provider_obj)

processes = []
for instance in provider_obj_list:
    process = mp.Process(target = instance.data_processing_func)
    process.daemon = True
    process.start()
    processes.append(process)

for process in processes:
    process.join()

# now that data_processing_func is complete for each set of data, 
# stack all the data
stack = pd.concat((instance.data for instance in provider_obj_list))

I have a number of modules (their names listed in operating_providers) that contain attributes specific to their data source. These modules are iteratively imported and passed to new instances of the Provider class, which I created in a separate module (provider). I append each Provider instance to a list (provider_obj_list), and then iteratively create separate processes which call the instance method instance.data_processing_func. This function does some data processing (with each instance accessing completely different data files), and creates new instance attributes along the way, which I need to access when the parallel processing is complete.

I tried using multithreading instead, rather than multiprocessing -- in this case, my instance attributes persisted, which is what I want. However, I am not sure why this happens -- I'll have to study the differences between threading vs. multiprocessing.

Thanks for any help!


Solution

  • Here's some sample code showing how to do what I outlined in comment. I can't test it because I don't have provider or pathos installed, but it should give you a good idea of what I suggested.

    import importlib
    from pathos.helpers import mp
    from provider import Provider
    
    def process_data(loc):
        module  = importlib.import_module(loc)
        provider_obj = Provider(module)
        provider_obj.data_processing_func()
    
    
    if __name__ == '__main__':
    
        # list of data providers ... length is arbitrary
        operating_providers = ['dataprovider1', 'dataprovider2', 'dataprovider3']
    
        # create list of provider locations for each operating provider
        provider_loc_list = []
        for name in operating_providers:
            loc = 'providers.%s' % name
            provider_loc_list.append(loc)
    
        processes = []
        for loc in provider_loc_list:
            process = mp.Process(target=process_data, args=(loc,))
            process.daemon = True
            process.start()
            processes.append(process)
    
        for process in processes:
            process.join()