首页 > 解决方案 > 如何使用 Python asyncio 在 asyncpg API 上实现同步外观?

问题描述

想象一个异步aiohttpWeb 应用程序,它由通过连接的 Postgresql 数据库支持asyncpg并且不执行其他 I/O。我怎样才能有一个托管应用程序逻辑的中间层,而不是异步的?(我知道我可以简单地使一切异步 - 但想象我的应用程序有大量的应用程序逻辑,仅受数据库 I/O 约束,我无法触及它的所有内容)。

伪代码:

async def handler(request):
    # call into layers over layers of application code, that simply emits SQL
    ...

def application_logic():
    ...
    # This doesn't work, obviously, as await is a syntax
    # error inside synchronous code.
    data = await asyncpg_conn.execute("SQL")
    ...
    # What I want is this:
    data = asyncpg_facade.execute("SQL")
    ...

如何asyncpg构建一个允许应用程序逻辑进行数据库调用的同步外观?async.run()在这种情况下,像 using或等一样浮动的配方asyncio.run_coroutine_threadsafe()在这种情况下不起作用,因为我们来自已经异步的上下文。我认为这不可能,因为已经有一个事件循环原则上可以运行asyncpg协程。

await额外的问题:使内部同步成为语法错误的设计原理是什么?await允许来自协程的任何上下文不是很有用吗,所以我们有简单的方法来分解功能构建块中的应用程序?

编辑额外奖励:除了保罗的非常好的答案,它留在“安全区”内,我会对避免阻塞主线程的解决方案感兴趣(导致更多gevent-ish)。另请参阅我对保罗回答的评论......

标签: pythonpython-asyncioaiohttpasyncpg

解决方案


您需要创建一个辅助线程来运行您的异步代码。您使用自己的事件循环初始化辅助线程,该循环永远运行。通过调用 run_coroutine_threadsafe() 并在返回的对象上调用 result() 来执行每个异步函数。这是 concurrent.futures.Future 的一个实例,它的 result() 方法在协程的结果从辅助线程准备好之前不会返回。

然后,您的主线程实际上是调用每个异步函数,就好像它是一个同步函数一样。在每个函数调用完成之前,主线程不会继续。顺便说一句,您的同步功能是否实际上在事件循环上下文中运行并不重要。

对 result() 的调用当然会阻塞主线程的事件循环。如果您想获得从同步代码运行异步函数的效果,这是无法避免的。

不用说,这是一件丑陋的事情,它暗示了错误的程序结构。但是您正在尝试转换旧程序,这可能会有所帮助。

import asyncio
import threading
from datetime import datetime

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Hello", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
    print(t1, t2)
 

if __name__ == "__main__":
    main()

>>> Hello 2021-10-26 20:37:00.454577
>>> Hello 1.0 2021-10-26 20:37:01.464127
>>> Hello 2.0 2021-10-26 20:37:03.468691
>>> 1.0 2.0

推荐阅读