Search code examples
pythonmultithreadingthreadpoolpython-multithreadingpython-decorators

Trying to add throttle control to paralleled API calls in python


I am using Google places API which has a query per second limit of 10. This means I cannot make more than 10 requests within a second. If we were using Serial execution this wouldn't be an issue as the APIs avg response time is 250 ms, so i will be able to make just 4 calls in a second.

To utilize the entire 10 QPS limit i used multithreading and made parallel API calls. But now i need to control the number of calls that can happen in a second, it should not go beyond 10 (google API starts throwing errors if i cross the limit)

Below is the code that i have so far, I am not able to figure out why the program just gets stuck sometimes or takes alot longer than required.

import time
from datetime import datetime
import random
from threading import Lock
from concurrent.futures import ThreadPoolExecutor as pool
import concurrent.futures
import requests
import matplotlib.pyplot as plt
from statistics import mean
from ratelimiter import RateLimiter

def make_parallel(func, qps=10):
    lock = Lock()
    threads_execution_que = []
    limit_hit = False
    def qps_manager(arg):
        current_second = time.time()
        lock.acquire()
        if len(threads_execution_que) >= qps or limit_hit:
            limit_hit = True
            if current_second - threads_execution_que[0] <= 1:
                time.sleep(current_second - threads_execution_que[0])
        current_time = time.time()
        threads_execution_que.append(current_time)
        lock.release()

        res = func(arg)

        lock.acquire()
        threads_execution_que.remove(current_time)
        lock.release()
        return res

    def wrapper(iterable, number_of_workers=12):
        result = []
        with pool(max_workers=number_of_workers) as executer:
            bag = {executer.submit(func, i): i for i in iterable}
            for future in concurrent.futures.as_completed(bag):
                result.append(future.result())
        return result
    return wrapper

@make_parallel
def api_call(i):
    min_func_time = random.uniform(.25, .3)
    start_time = time.time()
    try:
        response = requests.get('https://jsonplaceholder.typicode.com/posts', timeout=1)
    except Exception as e:
        response = e
    if (time.time() - start_time) - min_func_time < 0:
        time.sleep(min_func_time - (time.time() - start_time))
    return response

api_call([1]*50)

Ideally the code should take not more than 1.5 seconds, but currently it is taking about 12-14 seconds. The script speeds up to its expected speed as soon as i remove the QPS manager logic.

Please do suggest what i am doing wrong and also, if there is any package available already which does this mechanism out of the box.


Solution

  • Looks like ratelimit does just that:

    from ratelimit import limits, sleep_and_retry
    
    @make_parallel
    @sleep_and_retry
    @limits(calls=10, period=1)
    def api_call(i):
        try:
            response = requests.get("https://jsonplaceholder.typicode.com/posts", timeout=1)
        except Exception as e:
            response = e
        return response
    

    EDIT: I did some testing and it looks like @sleep_and_retry is a little too optimistic, so just increase the period a little, to 1.2 second:

    s = datetime.now()
    api_call([1] * 50)
    elapsed_time = datetime.now() - s
    print(elapsed_time > timedelta(seconds=50 / 10))