Search code examples
pythonpython-3.xsplitpython-itertoolsiterable

How do I fairly assign tasks to workers in Python? - Splitting iterable into similarly sized chunks


I have workers and tasks to do:

workers = ['peter', 'paul', 'mary']
tasks = range(13)

Now I want to split the tasks into chunks or batches of work, so that each worker can work on one batch and does about the same amount of work as everybody else. In my real life I want to schedule batch jobs to a compute farm. The batch jobs are supposed to run in parallel. The actual schedule&dispatch is done by a commercial grade tool such as lsf or grid.

Some examples of what I would expect:

>>> distribute_work(['peter', 'paul', 'mary'], range(3))
[('peter', [0]), ('paul', [1]), ('mary', [2])]
>>> distribute_work(['peter', 'paul', 'mary'], range(6))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2, 5])]
>>> distribute_work(['peter', 'paul', 'mary'], range(5))
[('peter', [0, 3]), ('paul', [1, 4]), ('mary', [2])]

This question is very similar to the questions here, here, and here

The difference is that I want these features, in the order or precedence:

  1. No use of len, if possible no build-up of long data structures internally
  2. Accept a generator
  3. Return generators
  4. As much use of stdlib components as possible

Some side notes on requirements:

  • No dicts on purpose: I've workers with the same name that can do multiple batches (unix hostnames). If your solution uses dicts, that's fine because we can always do worker lookup by a batch enumeration.
  • Arbitrary length: Both workers and tasks can be iterables of any length >= 1. And they do not have to split evenly as shown in the example above where Mary only gets one task.
  • Order: To me is not important. I guess others may prefer some order like [0,1], [2,3], [5], but I don't care. If your solution can keep or switch the order, maybe that's worthwhile pointing out to others.

I have tried to wrap my head around itertools and this particular problem and came up with the following code to illustrate the question:

from itertools import *

def distribute_work(workers, tasks):
    batches = range(len(workers))
    return [ ( workers[k],
               [t[1] for t in i]
               )   for (k,i) in groupby(sorted(zip(cycle(batches),
                                                   tasks),
                                               key=lambda t: t[0]),
                                        lambda t: t[0]) ]

This satisfies 4., but the sort very likely violates 1.. And 2./3. are not even thought about.

Probably there's some easy solution to this, combining some stdlib components in a way I haven't thought of. But maybe not. Any takers?


Solution

  • I think you want to use multiprocessing.Pool.imap to handle your workers and allocating their jobs. I believe it does everything you want.

    jobs = (some generator)                   # can consume jobs from a generator
    pool = multiprocessing.Pool(3)            # set number of workers here
    results = pool.imap(process_job, jobs)    # returns a generator
    
    for r in results:                         # loop will block until results arrive
        do_something(r)
    

    If the order of the results doesn't matter for your application, you can also use imap_unordered.