python - Python websockets 服务器和 websockets 客户端在运行这两个任务时使用 asyncio 断言错误
问题描述
我有3台机器;甲、乙、丙。
A 正在运行 C 想要连接的 websocket 服务器,但 C 无法直接连接到 A。为了解决这个问题,我想通过机器 B 来“代理”websocket。
A 充当发布者,每隔几秒就会产生新的数据包,我想将这些数据包推送到 C。C 不需要向 A 发送任何内容(尽管将来可能需要)。
我想通过使用 websockets ( https://pypi.org/project/websockets/ ) 模块来实现这个代理。我目前正在尝试在 B 上创建一个服务器来侦听连接并保持与任何客户端的 websocket 连接。然后我想异步运行一个连接到机器 A 的 websocket 客户端。这是一个订阅者连接,从 websocket 连接到 A 的任何更新都应该推送到 C(或任何其他连接到 B 的客户端)。
这是我当前的(最小)实现,其中发生了错误。
sockets = set()
def register(websocket):
sockets.add(websocket)
def unregister(websocket):
sockets.remove(websocket)
async def update_clients(update):
if sockets:
await asyncio.wait([socket.send(update) for socket in sockets])
async def client_server(websocket, path):
register(websocket)
await websocket.send("Welcome!")
async def live_updates():
async with websockets.connect(LIVE_URL) as websocket:
async for update in websocket.recv():
await update_clients(update)
webserver = websockets.serve(client_server, "0.0.0.0", PORT)
loop = asyncio.get_event_loop()
loop.run_until_complete(webserver)
loop.run_until_complete(live_updates)
loop.run_forever()
错误如下
Traceback (most recent call last):
File "/usr/lib/python3.6/asyncio/base_events.py", line 1310, in call_exception_handler
self.default_exception_handler(context)
File "/usr/lib/python3.6/asyncio/base_events.py", line 1282, in default_exception_handler
value = repr(value)
File "/usr/lib/python3.6/asyncio/base_tasks.py", line 15, in _task_repr_info
coro = coroutines._format_coroutine(task._coro)
File "/usr/lib/python3.6/asyncio/coroutines.py", line 276, in _format_coroutine
assert iscoroutine(coro)
AssertionError
解决方案
当我删除第二个代码时run_until_complete(live_updates)
,代码运行没有错误。所以我删除了第一个run_until_complete
,将我的代码与https://websockets.readthedocs.io/en/stable/intro.html中的示例代码进行比较并意识到
run_until_complete(live_updates)
应该run_until_complete(live_updates())
我会留下这个问题,以防有人遇到同样的错误,因为错误消息会让初学者感到困惑。
推荐阅读
- javascript - 自定义字体未加载到 Material UI 的自定义主题上
- jmeter - log4j2.xml 设置以禁用 JMeter 中的日志记录
- nestjs - 如何启用 NestJs swagger 4.x 插件
- laravel - Vue 组件未在其他刀片中显示
- python - 如何在 Python 中遍历嵌套的 JSON
- google-app-engine - 如何从 Google App Engine 中的单个 app.yaml 文件部署 Java 应用程序的多个版本
- amazon-web-services - 有没有一种方法可以限制某个区域的 AWS CloudFront 分配以节省成本?
- exception - 如何以编程方式检测 dotnet.exe 何时遇到第二次机会异常?
- visual-studio - 如果我想将会话超时增加到 60 分钟,是否足以在 Web.config 中进行更改,或者还需要调整 IIS?
- c++ - 仅当父循环的所有迭代都为真时,如何执行嵌套的 if 语句?