python - Flask SocketIo:测试事件的发射
问题描述
我正在构建一个使用 Flask SocketIO 的应用程序。为了准确了解 StocketIO 包的工作原理,我使用此工作流创建了一个简单的测试:
- HTTP Get 请求触发我的偶数
- 当我收到这个请求时,我使用带有函数 test() 的 SocketIO 发出事件
- 我尝试使用函数 handle_my_custom_event() 读取此事件发出。
这是我的代码:
app = Flask(__name__)
api = Api(app)
socket = SocketIO(app, logger=True, engineio_logger=True)
@app.route('/')
def test():
data = {'city': "new york", 'number': 10}
json_data = json.dumps(data)
socket.emit("test_event", json_data)
return {"message": "message send correctly"}, 200
@socket.on('test_event')
def handle_my_custom_event(json):
print('received json: ' + str(json))
return 'one', 2
if __name__ == '__main__':
socket.run(app)
我对本地主机执行 HTTP 请求:http: //127.0.0.1 :5000 / 这会触发我的函数测试,它会发出我的事件(在终端中,我有消息“向所有 [/] 发出事件“test_event”,所以事件是发射)
通常,我的handle_my_custom_event应该被这个事件触发,因为它监听了“test_event”,但什么也没发生,终端上也没有打印任何东西(我没有“received json: xx”)
所以我想这个事件不是很好的方式,但在那种情况下,如果你知道为什么,它真的会帮助我
谢谢
解决方案
您误解了 Socket.IO 的工作原理。当您调用emit()
服务器时,您正在向您的客户端发送信息。您发出的此事件的事件处理程序test_event
应在您的客户端中实现。
当客户端发出时,服务器会接收它,因此在这种情况下,您将在服务器中实现一个事件处理程序。
推荐阅读
- reactjs - 在行悬停时显示蚂蚁设计弹出框
- android - http包中的respnse.body没有返回所需的内容
- python - 有没有办法阻止自动下载组织者重新定位下载文件?
- sql - 如何从sql中的每个组中找到最小值?
- javascript - Firebase v9 Firestore 在 React JS 中按 id 创建文档
- python - 来自守护进程的错误响应:OCI 运行时创建失败:container_linux.go:380:启动容器进程导致:exec:“python”:
- delphi - 是否可以在 Delphi 中声明具有通用值类型的 TDictionary?
- filter - WP过滤器/动作
- discord - Discord.js - 音频资源无法正常播放
- ruby-on-rails - Ruby 将整数拆分为整数数组