python - 如何实现异步生成器?
问题描述
我订阅了一个 MQ 队列。每次收到消息时,我都会向它传递一个函数,然后对其执行一些耗时的 I/O 操作。
问题是一切都是连续发生的。
一个请求进来,它接收请求,通过调用函数执行操作,然后接收下一个请求。
我想异步执行此操作,以便可以以异步方式处理多个请求。
results = []
queue = queue.subscribe(name)
async for message in queue:
yield my_funcion(message)
最大的问题是 my_function 很慢,因为它调用外部 Web 服务,我希望我的代码同时处理其他消息。
我试图在上面实现它,但它不起作用!我不确定如何在这里实现异步。
我无法创建任务,因为我不知道会收到多少请求。这是我订阅的 MQ。我遍历每条消息并执行一个动作。在对下一条消息执行操作之前,我不希望该功能完成。我希望它异步发生。
解决方案
如果我理解您的请求,您需要的是请求处理程序填充的队列,并且您从需要对结果执行某些操作的代码中读取该队列。
如果您坚持使用异步迭代器,则可以直接使用生成器来公开队列的内容。例如:
def make_asyncgen():
queue = asyncio.Queue(1)
async def feed(item):
await queue.put(item)
async def exhaust():
while True:
item = await queue.get()
yield item
return feed, exhaust()
make_asyncgen
返回两个对象:一个异步函数和一个异步生成器。两者的连接方式是,当您使用项目调用函数时,生成器会发出该项目。例如:
import random, asyncio
# Emulate a server that takes some time to process each message,
# and then provides a result. Here it takes an async function
# that it will call with the result.
async def serve(server_ident, on_message):
while True:
await asyncio.sleep(random.uniform(1, 5))
await on_message('%s %s' % (server_ident, random.random()))
async def main():
# create the feed function, and the generator
feed, get = make_asyncgen()
# subscribe to serve several requests in parallel
asyncio.create_task(serve('foo', feed))
asyncio.create_task(serve('bar', feed))
asyncio.create_task(serve('baz', feed))
# process results from all three servers as they arrive
async for msg in get:
print('received', msg)
asyncio.run(main())
推荐阅读
- asp.net - 在 ASPX 页面中循环会话集合变量
- java - 为什么 Java DataOutputStream 类在所有输出相同的东西时提供 write() , writeInt()
- google-cloud-platform - 无法从 GCP pubsub 读取
- arduino - 用电位器控制步进电机速度
- javascript - 到日期应该大于 jquery 中的日期
- java - 速度模板替代方案
- python - 如何在 y 轴的 x 轴上找到一个值?
- java - 使用 Selenium 在所有应用服务器上加载缓存
- kubernetes - kubernetes:从 pod 中读取 pod 的秘密
- sql-server - 列 'c.number' 在选择列表中无效,因为它不包含在聚合函数或 GROUP BY 子句中