首页 > 解决方案 > 如何将 aio-pika 与 FastAPI 一起使用?

问题描述

我想在异步工作的同时使用 FastAPI 和 aio-pika 创建一个 REST 服务。对于其他异步数据库驱动程序,我可以在启动时创建客户端,然后在路由处理程序中获取它们。例如,对于电机,我会声明简单的连接管理器:

from motor.motor_asyncio import AsyncIOMotorClient


class Database:
    client: AsyncIOMotorClient = None


db = Database()


async def connect_to_mongo():
    db.client = AsyncIOMotorClient("mongo:27017")


async def close_mongo_connection():
    db.client.close()


async def get_mongo_client() -> AsyncIOMotorClient:
    return db.client

然后添加几个处理程序:

app.add_event_handler("startup", connect_to_mongo)
app.add_event_handler("shutdown", close_mongo_connection)

然后只是用来get_mongo_client给我的处理程序拿一个。

这里的问题是aio-pika需要asyncio循环才能运行。这是文档中的一个示例:

connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

并且使用 FastAPI 我没有 asyncio 循环。有什么方法可以将它与示例中的界面一起使用吗?我可以使用创建新循环asyncio.get_event_loop()并将其传递给connect_robust而不在任何地方真正使用它吗?像这样:

connection = await aio_pika.connect_robust(
            "amqp://guest:guest@127.0.0.1/", loop=asyncio.get_event_loop()
        )

标签: pythonasynchronousfastapi

解决方案


好的,所以,根据文档,我可以使用connect而不是connect_robust

connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

推荐阅读