首页 > 解决方案 > 确保使用 flask-socketio、redis 发送套接字消息

问题描述

我有一个在多个 pod 上运行的 flask-socketio 服务器,使用 redis 作为消息队列。我想确保来自外部进程的发射在 100% 的时间内到达目的地,或者知道它们何时失败。

当进程 A 向连接到进程 B 的套接字发出事件时,该事件将通过消息队列传递给进程 B,再传递给客户端。有什么办法可以拦截进程 B 上的传出发射吗?理想情况下,我会在几秒钟后使用工作人员检查消息是否到达客户端(通过客户端发出的确认事件),否则它将再次发出。

此代码在进程 A 上运行:

@app.route('/ex')
def ex_route():
    socketio.emit('external', {'text': f'sender: {socket.gethostname()}, welcome!'}, room='some_room')
    return jsonify(f'sending message to room "some_room" from {socket.gethostname()}')

这是进程 A 的输出

INFO:socketio.server:emitting event "external" to some_room [/]
INFO:geventwebsocket.handler:127.0.0.1 - - [2019-01-11 13:33:44] "GET /ex HTTP/1.1" 200 177 0.003196

这是进程 B 的输出

INFO:engineio.server:9aab2215a0da4816a45e3fdc1e449fce: Sending packet MESSAGE data 2["external",{"text":"sender: *******, welcome!"}]

标签: pythonredisflask-socketio

解决方案


使用 IPC 不是很健壮,尤其是在服务器收到大量请求的情况下,可能会出现您收到消息并且不重新翻译它的问题,这很重要。

使用 celery 或 zmq 或 redis 本身进行互连。最自然的是使用 Miguel 提到的 socketio 本身,因为它已经在等待具有环境的请求并且可以随时发出。

我在线程上使用了 greenlet hack——greenlet 比线程轻,并且在相同的环境中运行,允许它在您的主线程以非阻塞模式等待套接字时发送消息。基本上你写一个线程,然后通过monkeypatching将eventlet或gevent应用于整个代码,线程变成一个greenlet——一个中间函数调用。你在它上面设置了一个睡眠,这样它就不会占用所有资源并且你有你的发送者,因为greenlets很容易共享环境,它们不受io的约束,只是cpu(这对于Python中的线程来说是相同的,但是greenlets更加轻量级,因为根本没有操作系统级别的上下文更改)。

但是一旦 CPU 负载增加,我就切换到客户端/服务器。灌输 IPC 需要从头开始大量重写。


推荐阅读