I have a generator that returns me a certain string, how can I use it together with this code?
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
Instead of the array that is passed above, I use a generator, since the number of values does not fit in the array, and I do not need to store them.
Additional question: In this case, the transition to the next iteration should be carried out inside the function?
Essentially the same question that was posed here.
The essence is that multiprocessing will convert any iterable without a __len__
method into a list.
There is an open issue to add support for generators but for now, you're SOL.
If your array is too big to fit into memory, consider reading it in in chunks, processing it, and dumping the results to disk in chunks. Without more context, I can't really provide a more concrete solution.
UPDATE:
Thanks for posting your code. My first question, is it absolutely necessary to use multiprocessing? Depending on what my_function
does, you may see no benefit to using a ThreadPool
as python is famously limited by the GIL so any CPU bound worker function wouldn't speed up. In this case, maybe a ProcessPool
would be better. Otherwise, you are probably better off just running results = map(my_function, generator)
.
Of course, if you don't have the memory to load the input data, it is unlikely you will have the memory to store the results.
Secondly, you can improve your generator by using itertools
Try:
import itertools
import string
letters = string.ascii_lowercase
cod = itertools.permutations(letters, 6)
def my_function(x):
return x
def dump_results_to_disk(results, outfile):
with open(outfile, 'w') as fp:
for result in results:
fp.write(str(result) + '\n')
def process_in_chunks(generator, chunk_size=50):
accumulator = []
chunk_number = 1
for item in generator:
if len(accumulator) < chunk_size:
accumulator.append(item)
else:
results = list(map(my_function, accumulator))
dump_results_to_disk(results, "results" + str(chunk_number) + '.txt')
chunk_number += 1
accumulator = []
dump_results_to_disk(results, "results" + str(chunk_number))
process_in_chunks(cod)
Obviously, change my_function()
to whatever your worker function is and maybe you want to do something instead of dumping to disk. You can scale chunk_size
to however many entries can fit in memory. If you don't have the disk space or the memory for the results, then there's really no way for you process the data in aggregate