首页 > 解决方案 > 同时获取队列中的请求

问题描述

我编写的代码允许我在处理前一个数据块的同时开始从 API 获取下一个数据块。

我希望这始终在任何给定时刻同时获取多达5 个块,但是即使队列中最后一个请求在任何其他请求之前完成,返回的数据也应始终以正确的顺序处理。

如何更改我的代码以实现这一点?

class MyClient:
    async def fetch_entities(
        self,
        entity_ids:List[int],
        objects:Optional[List[str]],
        select_inbound:Optional[List[str]]=None,
        select_outbound:Optional[List[str]]=None,
        queue_size:int=5,
        chunk_size:int=500,
    ):
        """
        Fetch entities in chunks

        While one chunk of data is being processed the next one can
        already be fetched. In other words: Data processing does not
        block data fetching.
        """
        objects = ",".join(objects)
        if select_inbound:
            select_inbound = ",".join(select_inbound)

        if select_outbound:
            select_outbound = ",".join(select_outbound)

        queue = asyncio.Queue(maxsize=queue_size)

        # TODO: I want to be able to fill the queue with requests that are already executing

        async def queued_chunks():
            for ids in chunks(entity_ids, chunk_size):
                res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                    "entityIds": ids,
                    "objects": objects,
                    "inbound": {
                        "linkTypeIds": select_outbound,
                        "objects": objects,
                    } if select_inbound else {},
                    "outbound": {
                        "linkTypeIds": select_inbound,
                        "objects": objects,
                    } if select_outbound else {},
                })
                await queue.put(res)
            await queue.put(None)

        asyncio.create_task(queued_chunks())

        while True:
            res = await queue.get()
            if res is None:
                break
            res.raise_for_status()
            queue.task_done()
            for entity in res.json():
                yield entity

标签: pythonpython-asynciohttpx

解决方案


与其在入队之前等待协程,不如将协程入队并稍后等待

class MyClient:
async def fetch_entities(
    self,
    entity_ids:List[int],
    objects:Optional[List[str]],
    select_inbound:Optional[List[str]]=None,
    select_outbound:Optional[List[str]]=None,
    queue_size:int=5,
    chunk_size:int=500,
):
    """
    Fetch entities in chunks

    While one chunk of data is being processed the next one can
    already be fetched. In other words: Data processing does not
    block data fetching.
    """
    objects = ",".join(objects)
    if select_inbound:
        select_inbound = ",".join(select_inbound)

    if select_outbound:
        select_outbound = ",".join(select_outbound)

    queue = asyncio.Queue(maxsize=queue_size)

    # TODO: I want to be able to fill the queue with requests that are already executing

    async def queued_chunks():
        for ids in chunks(entity_ids, chunk_size):
            cor = self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                "entityIds": ids,
                "objects": objects,
                "inbound": {
                    "linkTypeIds": select_outbound,
                    "objects": objects,
                } if select_inbound else {},
                "outbound": {
                    "linkTypeIds": select_inbound,
                    "objects": objects,
                } if select_outbound else {},
            })
            task = asyncio.create_task(cor)
            await queue.put(cor)
        await queue.put(None)

    asyncio.create_task(queued_chunks())

    while True:
        task = await queue.get()
        if task is None:
            break
        res = await task
        res.raise_for_status()
        queue.task_done()
        for entity in res.json():
            yield entity

推荐阅读