Search code examples
pythonconcurrencymultiprocessing

can't get attribute function when running multiple processes


I wanted to compare the speed when using multiprocessing and a normal linear function.

My code looks like this

from multiprocessing import Pool, Manager
import time
from faker import Faker

num_rows = 1000000

items = [
    ['Column_1', Faker(), "pyint", {}],
    ['Column_2', Faker(), "random_element", {"elements": ["Kayden", "Franklin", "Gabriel", "Vincent"]}],
    ['Column_3', Faker(), "random_element", {"elements": ["Miller", "Ward", "Edwards", "Parry"]}],
    ['Column_4', Faker(), "pyint", {}],
    ['Column_5', Faker(), "pyint", {}],
    ['Column_6', Faker(), "pyint", {}],
    ['Column_7', Faker(), "pyint", {}],
    ['Column_8', Faker(), "pyint", {}],
    ['Column_9', Faker(), "pyint", {}],
    ['Column_10', Faker(), "pyint", {}],
    ['Column_11', Faker(), "pyint", {}],
    ['Column_12', Faker(), "pyint", {}],
    ['Column_13', Faker(), "pyint", {}],
    ['Column_14', Faker(), "pyint", {}],
    ['Column_15', Faker(), "pyint", {}],
    ['Column_16', Faker(), "pyint", {}],
    ['Column_17', Faker(), "pyint", {}],
    ['Column_18', Faker(), "pyint", {}],
    ['Column_19', Faker(), "pyint", {}],
    ['Column_20', Faker(), "pyint", {}],
    ['Column_21', Faker(), "pyint", {}],
    ['Column_22', Faker(), "pyint", {}],
    ['Column_23', Faker(), "pyint", {}],
]

def concurrent():
    with Manager() as dict_manager:
        data_frame = dict_manager.dict()

        global faker_function
        def faker_function(params):
            items = []
            for _ in range(0, num_rows - 1):
                items.append(getattr(params[1], params[2])(**params[3]))
            data_frame[params[0]] = items

        curr_time = time.time()
        with Pool(10) as p:
            p.map(faker_function, items)
        elapsed = time.time() - curr_time

        print('Concurrent', elapsed)
        print('Dict size', len(data_frame))

def linear():
    data_frame = {}

    def faker_function(params):
        items = []
        for _ in range(0, num_rows - 1):
            items.append(getattr(params[1], params[2])(**params[3]))
        data_frame[params[0]] = items

    curr_time = time.time()
    for item in items:
        faker_function(item)
    elapsed = time.time() - curr_time

    print('Linear time', elapsed)
    print('Dict size', len(data_frame))

if __name__ == "__main__":
    concurrent()

    linear()

Both functions linear and concurrect supposed to generate some data and write it into a dictionary (for multiprocessing I am using Manager object).

I have made the inner function - faker_function - into global.

But when I am running the app I get this error from the processes

Can't get attribute 'faker_function' on <module '__mp_main__'

What is the problem?


Solution

  • Let me back up and say this is absolutely the wrong way to use multiprocessing for two reasons.

    #1) Multiprocessing only makes sense when you each thread is doing a lot of work. That work has to be more than the work of setting up the thread, sending arguments to the thread, and receiving the arguments back. In this toy piece of code, this is almost certainly not the case.

    #2) Having multiple threads all modify a single global data structure is a bad idea. You can get it to work, but it's extremely slow and error prone. What you want is for each thread to return a value or values, and if they need to modify a global variable, let it be done in the main thread.

    So

    def faker_function(params):
        items = []
        for _ in range(0, num_rows - 1):
            items.append(getattr(params[1], params[2])(**params[3]))
        return params[0], items
    

    and then in your main function:

       data_frame = {}
       with Pool(10) as p:
           for key, result in p.map(faker_function, items):
                data_frame[key] = result;
    

    No need for a data manager.