首页 > 解决方案 > 尝试将油门控制添加到 python 中的并行 API 调用

问题描述

我正在使用 Google Places API,它的每秒查询限制为10。这意味着我不能在一秒钟内发出超过 10 个请求。如果我们使用串行执行,这不会是一个问题,因为 API 的平均响应时间是 250 毫秒,所以我将能够在一秒钟内进行 4 次调用。

为了利用整个10 QPS 限制,我使用了多线程并进行了并行 API 调用。但是现在我需要控制一秒钟内可能发生的调用次数,它不应该超过10(如果我超过限制,google API 开始抛出错误)

以下是我到目前为止的代码,我无法弄清楚为什么程序有时会卡住或花费的时间比要求的要长。

import time
from datetime import datetime
import random
from threading import Lock
from concurrent.futures import ThreadPoolExecutor as pool
import concurrent.futures
import requests
import matplotlib.pyplot as plt
from statistics import mean
from ratelimiter import RateLimiter

def make_parallel(func, qps=10):
    lock = Lock()
    threads_execution_que = []
    limit_hit = False
    def qps_manager(arg):
        current_second = time.time()
        lock.acquire()
        if len(threads_execution_que) >= qps or limit_hit:
            limit_hit = True
            if current_second - threads_execution_que[0] <= 1:
                time.sleep(current_second - threads_execution_que[0])
        current_time = time.time()
        threads_execution_que.append(current_time)
        lock.release()

        res = func(arg)

        lock.acquire()
        threads_execution_que.remove(current_time)
        lock.release()
        return res

    def wrapper(iterable, number_of_workers=12):
        result = []
        with pool(max_workers=number_of_workers) as executer:
            bag = {executer.submit(func, i): i for i in iterable}
            for future in concurrent.futures.as_completed(bag):
                result.append(future.result())
        return result
    return wrapper

@make_parallel
def api_call(i):
    min_func_time = random.uniform(.25, .3)
    start_time = time.time()
    try:
        response = requests.get('https://jsonplaceholder.typicode.com/posts', timeout=1)
    except Exception as e:
        response = e
    if (time.time() - start_time) - min_func_time < 0:
        time.sleep(min_func_time - (time.time() - start_time))
    return response

api_call([1]*50)

理想情况下,代码不应超过 1.5 秒,但目前大约需要 12-14 秒。 一旦我删除 QPS 管理器逻辑,脚本就会加速到它的预期速度。

请提出我做错了什么,并且,如果已经有任何可用的软件包可以开箱即用地执行此机制。

标签: pythonmultithreadingthreadpoolpython-multithreadingpython-decorators

解决方案


看起来ratelimit就是这样做的:

from ratelimit import limits, sleep_and_retry

@make_parallel
@sleep_and_retry
@limits(calls=10, period=1)
def api_call(i):
    try:
        response = requests.get("https://jsonplaceholder.typicode.com/posts", timeout=1)
    except Exception as e:
        response = e
    return response

编辑:我做了一些测试,看起来@sleep_and_retry有点过于乐观了,所以稍微增加一点,到 1.2 秒:

s = datetime.now()
api_call([1] * 50)
elapsed_time = datetime.now() - s
print(elapsed_time > timedelta(seconds=50 / 10))

推荐阅读