Search code examples
pythonpython-multiprocessing

Multiprocess pool, with same function per 60 process


So, the story. I have a list of accounts that I need do some filtering on them, more specifically.. there's a list of accounts on a specific website that holds many coins. I need to filter the balance of 2 million accounts.

So I want to process at least 100 accounts at a time. In the above example, that's what I tried.

max_proc =3

list_of_texts =[
    'acc_1',
    'acc_2',
    'acc_3',
    'acc_4',
    'acc_5',
    'acc_6',
    'acc_7',
    'acc_8',
    'acc_9',
    'acc_10',
    'acc_11',
]

def Start(text):
    print(text)

if __name__=='__main__':

    for index in range(0, len(list_of_texts), max_proc):

        pool = Pool(processes=max_proc)

        for item in list_of_texts[index:index+max_proc]:
            pool.map(Start, [item])

However I can't manage to make it work how I want, is still not parallel. My above example should have started 3 processes, with the first 3 accounts from the list, finish the.. then again process more 3 accounts until the list is done. But all in parallel. But it's still not working parallel, it's processing acc_1, then processing acc_2

The max_proc=3 is just for test, i will use 60 in production.

How I can achieve what I want?


Solution

  • You can split your list in set of 3 and then run map over that part like this

    from concurrent.futures import ProcessPoolExecutor
    max_proc =3
    
    def start_fun(text):
        print(text)
    
    if __name__ == "__main__":
        gen_of_texts =[
        'acc_1',
        'acc_2',
        'acc_3',
        'acc_4',
        'acc_5',
        'acc_6',
        'acc_7',
        'acc_8',
        'acc_9',
        'acc_10',
        'acc_11',
        ]
        with ProcessPoolExecutor(max_proc) as pool:
            pool.map(start_fun, zip(*(iter(gen_of_texts),) * max_proc)) # to split list in groups of max_proc
    

    using imap

    from multiprocessing import Pool
    max_proc =300
    
    def start_fun(text):
        return text
    
    if __name__ == "__main__":
        list_of_texts = ('GDUI73F5LZA47F4CBUNDT7FLWJ4U6DJQHDU3N3L55AONLUFV2QANT5D4' for _ in range(10**6))
        with Pool(max_proc) as pool:
            for res in pool.imap(start_fun, zip(*(iter(list_of_texts),) * max_proc), chunksize=max_proc):
                print(res)