Search code examples
pythonmultiprocessingthrottlingjoblib

Using throttler and joblib for heavy jobs and getting "cannot unpack non-iterable function object"


I have a lot of heavy jobs which need to send throttled HTTP requests (2 requests/sec) and calculate the results based on the response values.

I use throttler to control the request rate, and use joblib to calculate the results on multi-processes:

import asyncio
from math import sqrt
import time
from joblib import Parallel, delayed
from throttler import throttle


@throttle(rate_limit=1, period=0.5)
async def request(i):
    """
    Do nothing. Just sleep to mimic HTTP requests.
    """
    time.sleep(0.3)
    print(f"get response: {i} {time.time()}")
    return i


def heavy_calculate(i):
    """
    Just sqrt and square the number i for 30_000_000 times to mimik heavy job.
    It takes about 2.1 sec for each call.
    """
    n = i
    for _ in range(30_000_000):
        n = sqrt(n)
        n = n ** 2
    print(f"heavy_calculate: {n} {time.time()}")
    return n


async def many_tasks(count: int):
    print(f"=== START === {time.time()}")
    with Parallel(n_jobs=-1) as parallel:
        a = []
        coros = [request(i) for i in range(count)]
        for coro in asyncio.as_completed(coros):
            i = await coro

            #! Note: this heavy_calculate will block the request rate too
            results = heavy_calculate(i)

            #! Try to using the following call instead, but it raises error "cannot unpack non-iterable function object"
            # results = parallel(delayed(heavy_calculate)(i))

            a.append(results)
    print(f"=== END === {time.time()}")
    print(a)

# Start run
asyncio.run(many_tasks(10))

The above code shows two functions should be run:

  • request: mimic HTTP requests which is throttled with 0.5 sec.
  • heavy_calculate: mimic CPU-bound calculation which will block other jobs.

I want to call request with throttling and run heavy_calculate on multi-processes, but it shows error:

...
TypeError: cannot unpack non-iterable function object

Any suggestion?


Solution

  • You could use the AsyncIO loop.run_in_exectutor function to execute the CPU intensive work on a multi-processing pool, avoiding blocking the event loop:

    import asyncio
    from math import sqrt
    from concurrent.futures import ProcessPoolExecutor
    from throttler import throttle
    
    
    @throttle(rate_limit=1, period=0.5)
    async def fetch(i: int) -> int:
        await asyncio.sleep(0.3)
        return i
    
    
    def heavy_work(i: int) -> int:
        for _ in range(30_000_000):
            _ = sqrt(i) ** 2
        return i
    
    
    async def fetch_and_process(i: int, executor: ProcessPoolExecutor) -> int:
        i = await fetch(i)
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(executor, heavy_work, i)
    
    
    async def main():
        with ProcessPoolExecutor() as executor:
            results = await asyncio.gather(
                *(fetch_and_process(i, executor) for i in range(20))
            )
            print(results)
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    This would run many fetch_and_process concurrently within the throttling limit applied to the fetch instruction on as many CPU cores as available on your computer.

    Note that your fetch method has a flaw, you should use asyncio.sleep to simulate network latency and not time.sleep as the former will block the event loop.