首页 > 解决方案 > Flask 如何通过连接的客户端分发 RabbitMQ 消息

问题描述

我有一个 RabbitMQ 服务器和一个消耗 RabbitMQ 队列的 Flask 服务器。消息由另一台服务器发布。客户端使用SocketIO连接到 Flask 服务器,并在连接时将会话 ID 存储在同步队列中。消息消费在一个单独的线程中完成。当消息到达时,回调函数获取队列中的第一项并将 SocketIO 消息发送到相应的客户端。

我试图实现这一切,但是当消息到达并触发回调函数时,队列是空的,而在消息到达之前,我将客户端连接到它并验证确实它的会话 ID 已放入队列。

我的猜测是消费者线程使用线程启动时为空的队列副本,并且在客户端连接后在 Flask 线程中更新队列后,此更改不会应用于消费者线程。

这是我的代码:

app = Flask(__name__)
socketio = SocketIO(app, ping_interval=5, async_mode='threading')
socketio.init_app(app, cors_allowed_origins="*")
client_queue = queue.Queue()

@socketio.on('connect')
def client_connected():
    client_queue.put(request.sid, block=False)

def callback(ch, method, properties, body):
    try:
        selected_client = client_queue.get(block=True, timeout=5)
    except queue.Empty as e:
        print(e)
        print("No clients")

connection = pika.BlockingConnection(pika.ConnectionParameters(host=os.getenv("RABBITMQ")))

channel = connection.channel()

channel.queue_declare(queue=os.getenv("QUEUE_NAME"), durable=True)
channel.basic_consume(queue=os.getenv("QUEUE_NAME"), on_message_callback=callback)


if __name__ == '__main__':
    socketio.start_background_task(target=channel.start_consuming)
    socketio.run(app, debug=True)

因此,当我运行此代码时,使用 SocketIO 客户端连接到服务器,然后通过 RabbitMQ 发送消息,这将导致空队列预期(不打印客户端)。我在这里做错了什么?

标签: pythonmultithreadingflaskrabbitmqflask-socketio

解决方案


推荐阅读