首页 > 解决方案 > 如何使用 Python 以不同的输入异步调用 API 一百万次

问题描述

我有一个 ID 列表(大约 1 百万),我想对其进行迭代并对 API 端点进行异步调用并将响应存储在文件中。

到目前为止,我已经搜索了很多方法,但它们都只使用一个 ID 进行调用,找不到使用不同 ID 有效调用相同 API URL 的方法。

我了解性能影响,所以我想结合使用 asyncio、aiohttp,有效地利用 CPU 内核和线程来进行并行调用。(并发,多处理)

我的计划是将大列表分成 10 个 ID 的小块,然后将它们发送到不同的线程。

样本数据: id_data = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11'、'12'、'13'、'14'、'15'、'16'、'17'、'18'、'19'、'20'、'21'、'22'、'23 ', '24', '25', '26', '27', '28', '29'] 等等直到数百万...

API Endpoint = .get('https://random.url.of.web/v70/services/每个Id都来这里')

我现在的实现:

id_data = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29']

def divide_chunks(l, n): 
    for i in range(0, len(l), n):  
        yield l[i:i + n]

async def get_and_scrape_pages(num_pages: list,output_file: str):
    async with \
    aiohttp.ClientSession(headers=headers) as client, \
    aiofiles.open(output_file, "a+", encoding="utf-8") as f:
        for z in range(len(num_pages)):
            async with client.get('https://random.url.of.web/v70/services/'+id_data[z]) as response:
                print(z+1,accid_data[z],response.status)
                page = await response.text() + "-----" + str(z+1)
                #print(page)
                
                await f.write(page + "\n")
        await f.write("\n")

def start_scraping(num_pages: list,output_file: str):
    print("\n\tscraping now...\n")
    x = list(divide_chunks(accid_data, 10)) 
    for p in range(len(x)):
        asyncio.run(get_and_scrape_pages(x[p],output_file))


def main():
    NUM_API = 30
    NUM_CORES = cpu_count() # Our number of CPU cores (including logical cores)
    OUTPUT_FILE = "C://Users//40102046//eclipse-workspace//api_extraction//logs_1.txt" # File to append our scraped titles to
    
    print("number of CPU cores (including logical cores)",NUM_CORES)
    PAGES_PER_CORE = floor(NUM_API / NUM_CORES)
    PAGES_FOR_FINAL_CORE = PAGES_PER_CORE + NUM_API % PAGES_PER_CORE # For our final core
    
    print("PAGES_PER_CORE",PAGES_PER_CORE)
    print("PAGES_FOR_FINAL_CORE",PAGES_FOR_FINAL_CORE)
    
    futures = []
    
    with concurrent.futures.ProcessPoolExecutor(NUM_CORES) as executor:
        for i in range(NUM_CORES): 
            new_future = executor.submit(
                start_scraping(),
                num_pages=PAGES_PER_CORE,
                output_file=OUTPUT_FILE,
            )
            print("from main def:",i+1)
            futures.append(new_future)

        futures.append(
            executor.submit(
                start_scraping, PAGES_FOR_FINAL_CORE, OUTPUT_FILE
            )
        )

    concurrent.futures.wait(futures)
  
if __name__ == "__main__":
    main()

这段代码:每次总是向所有线程发送 10 个“ID”的第一个列表。我想将每个嵌套列表分别发送到不同的线程, 感谢任何帮助或指导。

标签: pythonmultithreadingapimultiprocessingpython-asyncio

解决方案


以下是我认为导致您正在处理的问题的一些错误,以及一些建议。

  1. z应该迭代 的值num_pages,而不是其长度的范围。这应该重命名为类似的东西id_chunk以反映其内容。
       for z in chunk_ids:
            async with client.get(
                    'https://random.url.of.web/v70/services/' + z) as response:
                print(z, response.status)
                page = await response.text() + "-----" + str(z)
                # print(page)
                await f.write(page + "\n")
  1. 我会将计算n转移到divide_chunks函数中,使num_chunks(即核心数)成为该函数的参数。您需要确保该divide_chunks函数返回所有块,包括最后一个可能小于n.
  2. 我会仔细检查您的代码main,因为我目前没有看到工作在对start_scraping. 在main中,您需要指定调用start_scraping应该在其上运行的块,并调用divide_chunksinmain而不是start_scraping. 来自的任何调用start_scraping都不应触及全局变量。他们关心的块在本地可用,并且调用get_and_scrape_pages提供了它需要的数据作为参数。
  3. 实现看起来比它需要的更复杂。例如,我认为异步运行文件 I/O 不会在性能上获得太多收益。我也一般不鼓励使用全局变量。

推荐阅读