首页 > 解决方案 > 在 Tornado 的阻塞上下文中调用异步函数

问题描述

我想在 Tornado 框架中实现一个基于 Web 套接字的服务。当用户关闭 Web 套接字时,我想通知其他用户。但是,on_close显然是一个阻塞函数,我的_broadcast(str) -> None函数是异步的。

我怎么能调用这个函数呢?

from tornado import websocket

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SocketHandler(websocket.WebSocketHandler):
    async def open(self, *args, conns, **kwargs):
        logger.info(f"Opened a new connection to client {id(self)}")
        self._conns = conns

    async def on_message(self, message):
        logger.info(f"Client {id(self)} sent message: {message}")
        await self._broadcast(message)

    def on_close(self):
        logger.info(f"Client {id(self)} has left the scene")
        self._conns.remove(self)
        self._broadcast("something")  # TODO

    async def _broadcast(self, msg): 
        for conn in self._conns: 
            try:
                await conn.write_message(msg)
            except websocket.WebSocketClosedError:
                pass


app = web.Application([
    (r'/ws', SocketHandler)
])

if __name__ == '__main__':
    app.listen(9000)
    ioloop.IOLoop.instance().start()

标签: pythonasync-awaittornado

解决方案


您正在寻找的简单解决方案是asyncio.create_task在调用协程时使用:

def on_close(self):
    logger.info(f"Client {id(self)} has left the scene")
    self._conns.remove(self)
    asyncio.create_task(self._broadcast("something"))

(这个函数的传统 Tornado 版本是tornado.gen.convert_yielded,但是现在 Tornado 和 asyncio 已经集成,没有理由不将 asyncio 版本用于原生协程)

但是对于这个特殊的问题,await在你的_broadcast函数中使用并不理想。等待 awrite_message用于提供流量控制,但create_taskawait. (write_message这是相当不寻常的,因为它完全支持使用和不使用它来调用它await)。事实上,它对错误的事情施加了背压——一个缓慢的连接会减慢向其后所有其他连接的通知。

所以在这种情况下,我建议制作_broadcast一个常规的同步函数:

def _broadcast(self, msg): 
    for conn in self._conns: 
        try:
            conn.write_message(msg)
        except websocket.WebSocketClosedError:
            pass

如果您希望能够更好地控制内存使用(通过 提供的流量控制await write_message),您将需要一个更复杂的解决方案,可能涉及每个连接的有界队列(在 中on_close,用于put_nowait将消息添加到每个连接的队列中,然后有一个任务从队列中读取并用await write_message)写入消息


推荐阅读