首页 > 解决方案 > 如何实现异步生成器?

问题描述

我订阅了一个 MQ 队列。每次收到消息时,我都会向它传递一个函数,然后对其执行一些耗时的 I/O 操作。
问题是一切都是连续发生的。

一个请求进来,它接收请求,通过调用函数执行操作,然后接收下一个请求。

我想异步执行此操作,以便可以以异步方式处理多个请求。

results = []
queue = queue.subscribe(name)
async for message in queue:
    yield my_funcion(message)

最大的问题是 my_function 很慢,因为它调用外部 Web 服务,我希望我的代码同时处理其他消息。
我试图在上面实现它,但它不起作用!我不确定如何在这里实现异步。

我无法创建任务,因为我不知道会收到多少请求。这是我订阅的 MQ。我遍历每条消息并执行一个动作。在对下一条消息执行操作之前,我不希望该功能完成。我希望它异步发生。

标签: pythonpython-3.xasync-awaitpython-asyncio

解决方案


如果我理解您的请求,您需要的是请求处理程序填充的队列,并且您从需要对结果执行某些操作的代码中读取该队列。

如果您坚持使用异步迭代器,则可以直接使用生成器来公开队列的内容。例如:

def make_asyncgen():
    queue = asyncio.Queue(1)
    async def feed(item):
        await queue.put(item)
    async def exhaust():
        while True:
            item = await queue.get()
            yield item
    return feed, exhaust()

make_asyncgen返回两个对象:一个异步函数和一个异步生成器。两者的连接方式是,当您使用项目调用函数时,生成器会发出该项目。例如:

import random, asyncio

# Emulate a server that takes some time to process each message,
# and then provides a result. Here it takes an async function
# that it will call with the result.
async def serve(server_ident, on_message):
    while True:
        await asyncio.sleep(random.uniform(1, 5))
        await on_message('%s %s' % (server_ident, random.random()))

async def main():
    # create the feed function, and the generator
    feed, get = make_asyncgen()

    # subscribe to serve several requests in parallel
    asyncio.create_task(serve('foo', feed))
    asyncio.create_task(serve('bar', feed))
    asyncio.create_task(serve('baz', feed))

    # process results from all three servers as they arrive
    async for msg in get:
        print('received', msg)

asyncio.run(main())

推荐阅读