python - 在 Tornado 的阻塞上下文中调用异步函数
问题描述
我想在 Tornado 框架中实现一个基于 Web 套接字的服务。当用户关闭 Web 套接字时,我想通知其他用户。但是,on_close
显然是一个阻塞函数,我的_broadcast(str) -> None
函数是异步的。
我怎么能调用这个函数呢?
from tornado import websocket
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SocketHandler(websocket.WebSocketHandler):
async def open(self, *args, conns, **kwargs):
logger.info(f"Opened a new connection to client {id(self)}")
self._conns = conns
async def on_message(self, message):
logger.info(f"Client {id(self)} sent message: {message}")
await self._broadcast(message)
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
self._broadcast("something") # TODO
async def _broadcast(self, msg):
for conn in self._conns:
try:
await conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
app = web.Application([
(r'/ws', SocketHandler)
])
if __name__ == '__main__':
app.listen(9000)
ioloop.IOLoop.instance().start()
解决方案
您正在寻找的简单解决方案是asyncio.create_task
在调用协程时使用:
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
asyncio.create_task(self._broadcast("something"))
(这个函数的传统 Tornado 版本是tornado.gen.convert_yielded
,但是现在 Tornado 和 asyncio 已经集成,没有理由不将 asyncio 版本用于原生协程)
但是对于这个特殊的问题,await
在你的_broadcast
函数中使用并不理想。等待 awrite_message
用于提供流量控制,但create_task
对await
. (write_message
这是相当不寻常的,因为它完全支持使用和不使用它来调用它await
)。事实上,它对错误的事情施加了背压——一个缓慢的连接会减慢向其后所有其他连接的通知。
所以在这种情况下,我建议制作_broadcast
一个常规的同步函数:
def _broadcast(self, msg):
for conn in self._conns:
try:
conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
如果您希望能够更好地控制内存使用(通过 提供的流量控制await write_message
),您将需要一个更复杂的解决方案,可能涉及每个连接的有界队列(在 中on_close
,用于put_nowait
将消息添加到每个连接的队列中,然后有一个任务从队列中读取并用await write_message
)写入消息
推荐阅读
- hugo - 任意页内内容片段
- database - 如何在 sqlite3 中正确连接这些表
- javascript - 在 ionic 5 中更改底部导航栏颜色
- reactjs - 获取为电影创建的翻译列表
- javascript - 在 Javascript Fetch 上添加 Cookie
- python - 编写一个程序,打印出超过六个测验分数的学生的姓名
- flutter - 水平 ListView.builder 上的 RefreshIndicator
- google-cloud-platform - 如何使用 NodeJs 客户端而不是存储对象获取谷歌云存储的公共 URL 列表
- audio - 使用ffmpeg将wav批量转换为m4a文件
- angular - 我使用 angular 11 创建了表单