I have a python script where at the top of the file I have:
result_queue = Queue.Queue()
key_list = *a large list of small items* #(actually from bucket.list() via boto)
I have learned that Queues are process safe data structures. I have a method:
def enqueue_tasks(keys):
for key in keys:
result = perform_scan.delay(key)
print "failed"
The perform_scan.delay()
function here actually calls a celery worker, but I don't think is relevant (it is an asynchronous process call).
I also have:
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
Lastly I have a main()
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
print len(result_queue)
The result from the print statement is a 0. Yet if I include a print statement of the size of result_queue
in enqueue_tasks
, while the program is running, I can see that the size is increasing and things are being added to the queue.
Ideas of what is happening?
It looks like there's a simpler solution to this problem.
You're building a list of futures. The whole point of futures is that they're future results. In particular, whatever each function returns, that's the (eventual) value of the future. So, don't do the whole "push results onto a queue" thing at all, just return them from the task function, and pick them up from the futures.
The simplest way to do this is to break that loop up so that each key is a separate task, with a separate future. I don't know whether that's appropriate for your real code, but if it is:
def do_task(key):
return perform_scan.delay(key)
print "failed"
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(do_task, key) for key in key_list]
# If you want to do anything with these results, you probably want
# a loop around concurrent.futures.as_completed or similar here,
# rather than waiting for them all to finish, ignoring the results,
# and printing the number of them.
print len(futures)
Of course that doesn't do the grouping. But do you need it?
The most likely reason for the grouping to be necessary is that the tasks are so tiny that the overhead in scheduling them (and pickling the inputs and outputs) swamps the actual work. If that's true, then you can almost certainly wait until a whole batch is done to return any results. Especially given that you're not even looking at the results until they're all done anyway. (This model of "split into groups, process each group, merge back together" is pretty common in cases like numerical work, where each element may be tiny, or elements may not be independent of each other, but there are groups that are big enough or independent from the rest of the work.)
At any rate, that's almost as simple:
def do_tasks(keys):
results = []
for key in keys:
result = perform_scan.delay(key)
print "failed"
return results
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
print sum(len(results) for results in concurrent.futures.as_completed(futures))
Or, if you prefer to first wait and then calculate:
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
print sum(len(future.result()) for future in futures)
But again, I doubt you need even this.