python - 确保使用 flask-socketio、redis 发送套接字消息
问题描述
我有一个在多个 pod 上运行的 flask-socketio 服务器,使用 redis 作为消息队列。我想确保来自外部进程的发射在 100% 的时间内到达目的地,或者知道它们何时失败。
当进程 A 向连接到进程 B 的套接字发出事件时,该事件将通过消息队列传递给进程 B,再传递给客户端。有什么办法可以拦截进程 B 上的传出发射吗?理想情况下,我会在几秒钟后使用工作人员检查消息是否到达客户端(通过客户端发出的确认事件),否则它将再次发出。
此代码在进程 A 上运行:
@app.route('/ex')
def ex_route():
socketio.emit('external', {'text': f'sender: {socket.gethostname()}, welcome!'}, room='some_room')
return jsonify(f'sending message to room "some_room" from {socket.gethostname()}')
这是进程 A 的输出
INFO:socketio.server:emitting event "external" to some_room [/]
INFO:geventwebsocket.handler:127.0.0.1 - - [2019-01-11 13:33:44] "GET /ex HTTP/1.1" 200 177 0.003196
这是进程 B 的输出
INFO:engineio.server:9aab2215a0da4816a45e3fdc1e449fce: Sending packet MESSAGE data 2["external",{"text":"sender: *******, welcome!"}]
解决方案
使用 IPC 不是很健壮,尤其是在服务器收到大量请求的情况下,可能会出现您收到消息并且不重新翻译它的问题,这很重要。
使用 celery 或 zmq 或 redis 本身进行互连。最自然的是使用 Miguel 提到的 socketio 本身,因为它已经在等待具有环境的请求并且可以随时发出。
我在线程上使用了 greenlet hack——greenlet 比线程轻,并且在相同的环境中运行,允许它在您的主线程以非阻塞模式等待套接字时发送消息。基本上你写一个线程,然后通过monkeypatching将eventlet或gevent应用于整个代码,线程变成一个greenlet——一个中间函数调用。你在它上面设置了一个睡眠,这样它就不会占用所有资源并且你有你的发送者,因为greenlets很容易共享环境,它们不受io的约束,只是cpu(这对于Python中的线程来说是相同的,但是greenlets更加轻量级,因为根本没有操作系统级别的上下文更改)。
但是一旦 CPU 负载增加,我就切换到客户端/服务器。灌输 IPC 需要从头开始大量重写。
推荐阅读
- java - 如何在 HiveMQ Client 中获取客户端连接信息?(MQTT)
- firebase - firebase 函数:shell 不模拟调用之间的内存状态
- php - php 将更多数组保存到 MySQL
- jquery - CSS动画:每个单词的淡入+变换动画不起作用
- c# - Lambdas 和条件:任何人都可以看到 C# 编译器无法解决以下问题的原因吗?
- ruby-on-rails - Rails 在两个应用程序之间共享 Active Storage(服务 AWS)
- python - 对音频流使用多处理
- python - Wrong conversion of datetime timezone aware object in template Django
- qt - 图表中 LineSeries 的中继器
- r - 我们如何找到数据的平滑函数?