python - 同时获取队列中的请求
问题描述
我编写的代码允许我在处理前一个数据块的同时开始从 API 获取下一个数据块。
我希望这始终在任何给定时刻同时获取多达5 个块,但是即使队列中最后一个请求在任何其他请求之前完成,返回的数据也应始终以正确的顺序处理。
如何更改我的代码以实现这一点?
class MyClient:
async def fetch_entities(
self,
entity_ids:List[int],
objects:Optional[List[str]],
select_inbound:Optional[List[str]]=None,
select_outbound:Optional[List[str]]=None,
queue_size:int=5,
chunk_size:int=500,
):
"""
Fetch entities in chunks
While one chunk of data is being processed the next one can
already be fetched. In other words: Data processing does not
block data fetching.
"""
objects = ",".join(objects)
if select_inbound:
select_inbound = ",".join(select_inbound)
if select_outbound:
select_outbound = ",".join(select_outbound)
queue = asyncio.Queue(maxsize=queue_size)
# TODO: I want to be able to fill the queue with requests that are already executing
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
await queue.put(res)
await queue.put(None)
asyncio.create_task(queued_chunks())
while True:
res = await queue.get()
if res is None:
break
res.raise_for_status()
queue.task_done()
for entity in res.json():
yield entity
解决方案
与其在入队之前等待协程,不如将协程入队并稍后等待
class MyClient:
async def fetch_entities(
self,
entity_ids:List[int],
objects:Optional[List[str]],
select_inbound:Optional[List[str]]=None,
select_outbound:Optional[List[str]]=None,
queue_size:int=5,
chunk_size:int=500,
):
"""
Fetch entities in chunks
While one chunk of data is being processed the next one can
already be fetched. In other words: Data processing does not
block data fetching.
"""
objects = ",".join(objects)
if select_inbound:
select_inbound = ",".join(select_inbound)
if select_outbound:
select_outbound = ",".join(select_outbound)
queue = asyncio.Queue(maxsize=queue_size)
# TODO: I want to be able to fill the queue with requests that are already executing
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
cor = self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
task = asyncio.create_task(cor)
await queue.put(cor)
await queue.put(None)
asyncio.create_task(queued_chunks())
while True:
task = await queue.get()
if task is None:
break
res = await task
res.raise_for_status()
queue.task_done()
for entity in res.json():
yield entity
推荐阅读
- react-native - 有没有办法在加载后一次在 React-Native 中重新加载屏幕?
- java - 发送 1 个 JSON 和 1 个多表单/数据的 Swagger-UI 问题
- javascript - 如何用电子跨平台录制音视频
- mysql - 在 eloquent 模型中查询
- boto3 - 使用 python SDK 从 S3 下载文件
- active-directory - Liferay 6.2 GA6 - 无法访问 LDAP“userPassword”属性
- c - 为什么 Ctrl+C 在我的终端中不起作用?
- javascript - 没有框架的 NodeJS:findById
- c# - 在 Unity 中,为什么我可以通过按下按钮将相机切换到 WorldView,但在使用人脸管理器的同时无法在开始或更新循环中执行此操作?
- python - 如何在 python Django 中发送连续响应?