I want to use multiprocessing in Python to sort independent lists.
For example, I have a dictionary of an int as a key and a list as a value.
I tried to implement a simple program, but I have a difficulty to store the sorted list in a defaultdict again and return it to the main module.
from multiprocessing import Process
def fun(id, user_data):
user_data.sort()
return user_data
# users_data is a defaultdict of id as key and a list as a value
if __name__ == '__main__':
for id,user_data in users_data.items():
P= Process(target=fun,args=(id,user_data))
P.start()
P.join()
You'll need to use Manager to share data between processes.
Also, as @Tomerikoo mentioned in the comments, the way you are doing it right now will not actually result in multiprocessing as P.join()
just after P.start()
will the script pause to let that process finish, thus resulting in a serial execution flow rather than parallel.
You can do something like this:
from multiprocessing import Process, Manager
def sort_list(user_id, user_data, interprocess_dict):
user_data.sort()
interprocess_dict[user_id] = user_data
users_data = {}
users_data[1] = [5, 2, 1]
users_data[3] = [10, 12, 1]
def main():
interprocess_dict = Manager().dict()
processes = []
for user_id, user_data in users_data.items():
proc = Process(target=sort_list, args=(user_id, user_data, interprocess_dict,))
processes.append(proc)
proc.start()
for proc in processes:
proc.join()
for user_id, user_data in interprocess_dict.items():
print('{}: {}'.format(user_id, user_data))
if __name__ == '__main__':
main()
EDIT:
Its better to limit the number of processes to the number of hardware CPU units available as sorting a list is 100% CPU bound operation.
import multiprocessing as mp
def sort_list(user_id, user_data, interprocess_dict):
user_data.sort()
interprocess_dict[user_id] = user_data
def prepare_data():
users_data = {}
for i in range(1000):
users_data[i] = list(range(10000, 0, -1))
return users_data
def main():
# mp.set_start_method('spawn') # Only valid on OSX
interprocess_dict = mp.Manager().dict()
pool = mp.Pool(mp.cpu_count())
users_data = prepare_data()
for user_id, user_data in users_data.items():
pool.apply_async(sort_list, args = (user_id, user_data, interprocess_dict,))
pool.close()
pool.join()
for user_id, user_data in interprocess_dict.items():
print('{}: {}'.format(user_id, user_data))
if __name__ == '__main__':
main()