python - 如何使用 Python 以不同的输入异步调用 API 一百万次
问题描述
我有一个 ID 列表(大约 1 百万),我想对其进行迭代并对 API 端点进行异步调用并将响应存储在文件中。
到目前为止,我已经搜索了很多方法,但它们都只使用一个 ID 进行调用,找不到使用不同 ID 有效调用相同 API URL 的方法。
我了解性能影响,所以我想结合使用 asyncio、aiohttp,有效地利用 CPU 内核和线程来进行并行调用。(并发,多处理)
我的计划是将大列表分成 10 个 ID 的小块,然后将它们发送到不同的线程。
样本数据: id_data = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11'、'12'、'13'、'14'、'15'、'16'、'17'、'18'、'19'、'20'、'21'、'22'、'23 ', '24', '25', '26', '27', '28', '29'] 等等直到数百万...
API Endpoint = .get('https://random.url.of.web/v70/services/每个Id都来这里')
我现在的实现:
id_data = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29']
def divide_chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
async def get_and_scrape_pages(num_pages: list,output_file: str):
async with \
aiohttp.ClientSession(headers=headers) as client, \
aiofiles.open(output_file, "a+", encoding="utf-8") as f:
for z in range(len(num_pages)):
async with client.get('https://random.url.of.web/v70/services/'+id_data[z]) as response:
print(z+1,accid_data[z],response.status)
page = await response.text() + "-----" + str(z+1)
#print(page)
await f.write(page + "\n")
await f.write("\n")
def start_scraping(num_pages: list,output_file: str):
print("\n\tscraping now...\n")
x = list(divide_chunks(accid_data, 10))
for p in range(len(x)):
asyncio.run(get_and_scrape_pages(x[p],output_file))
def main():
NUM_API = 30
NUM_CORES = cpu_count() # Our number of CPU cores (including logical cores)
OUTPUT_FILE = "C://Users//40102046//eclipse-workspace//api_extraction//logs_1.txt" # File to append our scraped titles to
print("number of CPU cores (including logical cores)",NUM_CORES)
PAGES_PER_CORE = floor(NUM_API / NUM_CORES)
PAGES_FOR_FINAL_CORE = PAGES_PER_CORE + NUM_API % PAGES_PER_CORE # For our final core
print("PAGES_PER_CORE",PAGES_PER_CORE)
print("PAGES_FOR_FINAL_CORE",PAGES_FOR_FINAL_CORE)
futures = []
with concurrent.futures.ProcessPoolExecutor(NUM_CORES) as executor:
for i in range(NUM_CORES):
new_future = executor.submit(
start_scraping(),
num_pages=PAGES_PER_CORE,
output_file=OUTPUT_FILE,
)
print("from main def:",i+1)
futures.append(new_future)
futures.append(
executor.submit(
start_scraping, PAGES_FOR_FINAL_CORE, OUTPUT_FILE
)
)
concurrent.futures.wait(futures)
if __name__ == "__main__":
main()
这段代码:每次总是向所有线程发送 10 个“ID”的第一个列表。我想将每个嵌套列表分别发送到不同的线程, 感谢任何帮助或指导。
解决方案
以下是我认为导致您正在处理的问题的一些错误,以及一些建议。
z
应该迭代 的值num_pages
,而不是其长度的范围。这应该重命名为类似的东西id_chunk
以反映其内容。
for z in chunk_ids:
async with client.get(
'https://random.url.of.web/v70/services/' + z) as response:
print(z, response.status)
page = await response.text() + "-----" + str(z)
# print(page)
await f.write(page + "\n")
- 我会将计算
n
转移到divide_chunks
函数中,使num_chunks
(即核心数)成为该函数的参数。您需要确保该divide_chunks
函数返回所有块,包括最后一个可能小于n
. - 我会仔细检查您的代码
main
,因为我目前没有看到工作在对start_scraping
. 在main
中,您需要指定调用start_scraping
应该在其上运行的块,并调用divide_chunks
inmain
而不是start_scraping
. 来自的任何调用start_scraping
都不应触及全局变量。他们关心的块在本地可用,并且调用get_and_scrape_pages
提供了它需要的数据作为参数。 - 实现看起来比它需要的更复杂。例如,我认为异步运行文件 I/O 不会在性能上获得太多收益。我也一般不鼓励使用全局变量。
推荐阅读
- r - Leaflet r地图保存问题
- javascript - Javascript appendChild 不适用于 li 元素
- linux - 以 iso 格式创建 OS 映像
- reactjs - import & export may only appear at the top level - CRA React App with Typescript
- angularjs - AngularJs ui-router 成功过渡获取控制器
- google-cloud-platform - 从云功能插入扳手的会话泄漏
- sapjco3 - SAPJCo 是特定于 SAP 安装还是可以在不同的 SAP Box 之间重复使用?
- javascript - 为什么第二次迭代后 b 的值会自行增加?
- wordpress - 防止 CMS 识别
- javascript - 检查您是从主页还是子页面重定向的方法