python - 如何将 aio-pika 与 FastAPI 一起使用?
问题描述
我想在异步工作的同时使用 FastAPI 和 aio-pika 创建一个 REST 服务。对于其他异步数据库驱动程序,我可以在启动时创建客户端,然后在路由处理程序中获取它们。例如,对于电机,我会声明简单的连接管理器:
from motor.motor_asyncio import AsyncIOMotorClient
class Database:
client: AsyncIOMotorClient = None
db = Database()
async def connect_to_mongo():
db.client = AsyncIOMotorClient("mongo:27017")
async def close_mongo_connection():
db.client.close()
async def get_mongo_client() -> AsyncIOMotorClient:
return db.client
然后添加几个处理程序:
app.add_event_handler("startup", connect_to_mongo)
app.add_event_handler("shutdown", close_mongo_connection)
然后只是用来get_mongo_client
给我的处理程序拿一个。
这里的问题是aio-pika
需要asyncio
循环才能运行。这是文档中的一个示例:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=loop
)
并且使用 FastAPI 我没有 asyncio 循环。有什么方法可以将它与示例中的界面一起使用吗?我可以使用创建新循环asyncio.get_event_loop()
并将其传递给connect_robust
而不在任何地方真正使用它吗?像这样:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/", loop=asyncio.get_event_loop()
)
解决方案
好的,所以,根据文档,我可以使用connect
而不是connect_robust
:
connection = await aio_pika.connect(
"amqp://guest:guest@127.0.0.1/"
)
推荐阅读
- ruby-on-rails - 如何为以下代码创建 RSpec 测试
- keras - Keras 学习乘以二的问题
- c++ - 如果未输入 getline 分隔符,推荐一种引发异常的方法?
- python-3.x - 如何运行具有包含换行符的参数的进程?
- java - 我想在什么时候输入一个可选的“错误号码。再试一次”
- java - 如何打印出具有缩进的代码以实际具有缩进?
- django - Django 模型数据过滤器
- python-3.x - xgboost plot_tree(model) - 修复 TypeError: super(type, obj) when Running
- wpf - 是否可以通过 ControlTemplate 中的 xaml 绑定 PolyLineSegment.Points?
- ios - 输入 UITextField 后 UIView 消失?