python - Python socket.io 在本地发出一个事件
问题描述
我想发出一个本地事件,而不是使用来自服务器的事件。基本上,本地调用事件。原因是处理数据,然后可以由另一个函数使用。
相关问题:
socket.io 客户端可以在本地发出事件吗?- 这仅支持 node.js,不支持它的 python 实现。
代码:
await self.sio.connect(SOCKET_IP, namespaces=['/'], transports=['websocket'])
await sio.emit("connection")
await sio.emit('authentication', {'token': new_token})
@sio.event
async def auth_err(data):
authErr = True
await sio.disconnect()
@sio.event
async def alive_status(data):
await sio.emit("alive_status_return", "alive")
@sio.on('connected')
async def on_connect(data):
print("Connected")
我知道肯定sio.emit()
会广播到我不想要的服务器。
看着做,sio.call()
但最终付出await sio.call("on_new_data")
Sending packet MESSAGE data 21["on_new_data"]
然后函数没有得到数据
@sio.event
async def on_new_data():
print("2")
解决方案
我最终不得不为此在asyncio_client.py
python socket.io 中编写一个自定义函数
async def _call_event(self, event, namespace, data=None):
namespace = namespace or '/'
self.logger.info('Received custom event "%s" [%s]', event, namespace)
try:
if data:
r = await self._trigger_event(event, namespace, data)
else:
r = await self._trigger_event(event, namespace)
except Exception as e:
print(traceback.format_exc())
推荐阅读
- arrays - 如何在 Flutter 中解析这个 JSON 数组?
- python - 使用变量时的Python SQLite3绑定错误
- rust - 在运行时查找结构的总大小
- python - 如何用另一个数据框中的值替换数据框中的缺失值?
- python - 返回 postgres 查询时 Psycopg2 无效的 json
- ubuntu - 是否可以仅在命令行上启动 Ubuntu 时运行 JavaFX?
- dask - Dask One 热编码器 handle_unknown="ignore",可以解决吗?
- angular - 可观察到的下一个后的角度回调
- java - 多次按下按钮时如何使声音重叠?
- python - PySpark - 有没有办法迭代多列并用列的最大数(+1)填充 NA?