python - 处理具有并行/异步请求的队列
问题描述
我想实现一个并行request.get()
函数,它处理一个请求队列并将结果放入一个列表中,完成后由标准顺序代码处理。我尝试了以下操作,但我的代码没有结束,也没有打印 ID。
import requests
from queue import Queue
from threading import Thread
BASE = 'http://www.uniprot.org'
KB_ENDPOINT = '/uniprot/'
FORMAT = ".xml"
num_threads = 10
ID_q = Queue()
ID_data = Queue()
# worker function
def get_ID_data(ID_q, ID_data, BASE, KB_ENDPOINT, FORMAT):
while True:
ID = ID_q.get()
print(ID)
ID_data.put(requests.get(BASE + KB_ENDPOINT + ID + FORMAT))
ID_q.task_done()
ID_data.task_done()
# initialize worker
for i in range(num_threads):
worker = Thread(target=get_ID_data, args=(ID_q, ID_data, BASE, KB_ENDPOINT, FORMAT))
worker.setDaemon(True)
worker.start()
# load IDs and put in queue
ID_list = ["A6ZMA9", "N1P5E6",
"H0GM11", "H0GZ91",
"A0A0L8VK54", "G2WKA0",
"C8ZEQ4", "B5VPH8",
"B3LLU5", "C7GL72",
"J8QFS9", "J8Q1C1",
"A0A0L8RDV1"]
for ID in ID_list:
ID_q.put(ID)
ID_q.join()
# work with ID_data
print(ID_data)
更新:我使用 asyncio 和 aiohttp 将@pkqxdd 答案更改为:
import asyncio,aiohttp
IDs = ["A6ZMA9", "N1P5E6",
"H0GM11", "H0GZ91",
"A0A0L8VK54", "G2WKA0",
"C8ZEQ4", "B5VPH8",
"B3LLU5", "C7GL72",
"J8QFS9", "J8Q1C1",
"A0A0L8RDV1"]
BASE = 'http://www.uniprot.org'
KB_ENDPOINT = '/uniprot/'
FORMAT = ".xml"
async def get_data_coroutine(session, ID):
async with session.get(BASE + KB_ENDPOINT + ID + FORMAT) as response:
res = await response.text()
print(ID)
if not res:
raise NameError('{} is not available'.format(ID))
return res
async def main(loop):
async with aiohttp.ClientSession(loop=loop) as session:
tasks = [get_data_coroutine(session, ID) for ID in IDs]
return await asyncio.gather(*tasks)
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(loop))
解决方案
既然您提到了异步,我假设您使用的是 Python3.6 或更高版本。
该库requests
并不真正支持异步编程,尝试使其异步有点死胡同。一个更好的主意是aiohttp
改用。
您可以使用以下简单代码实现您的目标:
import asyncio,aiohttp
BASE = 'http://www.uniprot.org'
KB_ENDPOINT = '/uniprot/'
FORMAT = ".xml"
ID_list = ["A6ZMA9", "N1P5E6",
"H0GM11", "H0GZ91",
"A0A0L8VK54", "G2WKA0",
"C8ZEQ4", "B5VPH8",
"B3LLU5", "C7GL72",
"J8QFS9", "J8Q1C1",
"A0A0L8RDV1"]
session=aiohttp.ClientSession()
async def get_data(ID):
async with session.get(BASE + KB_ENDPOINT + ID + FORMAT) as response:
return await response.text()
coros=[]
for ID in ID_list:
coros.append(get_data(ID))
loop=asyncio.get_event_loop()
fut=asyncio.gather(*coros)
loop.run_until_complete(fut)
print(fut.result())
(是的,我看到了警告。但我真的不想让答案更复杂。你应该改变它以更好地适应你的目的。)
推荐阅读
- ios - 为什么 iOS 14 ATT 提示无法在模拟器上运行?
- python - scipy 中的 st.ttest_ind 返回 p 值为 0.0
- c++ - const_cast:仅当原始变量为 const 时,修改以前的 const 值才是未定义的
- python - 如果两个列是相同的 Pandas,则在它们之间添加列
- scrapy - XPath 选择器返回空列表
- visual-studio-2019 - Visual Studio 2019 - Angular 9.1.9 - 没有实时重新加载
- javascript - google fitBounds 无法按预期工作
- google-cloud-dataflow - 状态和输出的原子性
- android - Kotlin 1.4 是否放弃对 Coroutine Actors 的支持
- python - python中隔离林是否必须设置污染值?