首页 > 解决方案 > 在 Python FastAPI 应用程序中启动异步后台守护程序

问题描述

我正在使用 FastAPI 为分析系统构建异步后端。问题是它必须: a) 监听 API 调用并随时可用;b) 定期执行数据收集任务(解析数据并将其保存到数据库中)。

我编写了这个函数来充当守护进程:

    async def start_metering_daemon(self) -> None:
        """sets a never ending task for metering"""
        while True:
            delay: int = self._get_delay()  # delay in seconds until next execution
            await asyncio.sleep(delay)
            await self.gather_meterings()  # perfom data gathering

我想要实现的是,当应用程序启动时,它还会将此守护程序函数添加到主事件循环中,并在有时间时执行它。但是,我一直无法找到适合任务规模的合适解决方案(添加 Celery 和类似的东西是一种矫枉过正)。

我尝试了以下方法来实现这一点,但都没有奏效:

@app.on_event("startup")
async def startup_event() -> None:
    """tasks to do at server startup"""
    await Gatherer().start_metering_daemon()

结果:由于线程被阻塞,服务器无法启动

@app.on_event("startup")
async def startup_event() -> None:
    """tasks to do at server startup"""
    fastapi.BackgroundTasks().add_task(Gatherer().start_metering_daemon)

结果:任务从未像日志中观察到的那样执行

@app.on_event("startup")
async def startup_event() -> None:
    """tasks to do at server startup"""
    fastapi.BackgroundTasks().add_task(asyncio.run, Gatherer().start_metering_daemon())

结果:和上一个一样

@app.on_event("startup")
async def startup_event() -> None:
    """tasks to do at server startup"""
    threading.Thread(target=asyncio.run, args=(Gatherer().start_metering_daemon(),)).start()

结果:这个可行,但 a) 没有意义;b) 为 N 个 Uvicorn 工作人员生成 N 个相同的线程,这些工作人员都将相同的数据写入数据库 N 次。

我现在没有解决方案。我很确定我的问题必须有一个解决方案,因为这对我来说看起来很微不足道,但我找不到。

如果您想要更多上下文,这里是我所指的项目的回购

标签: pythonasync-awaitpython-asynciofastapi

解决方案


尝试

@app.on_event("startup")
async def startup_event() -> None:
    """tasks to do at server startup"""
    asyncio.create_task(Gatherer().start_metering_daemon())

推荐阅读