python - Flask 如何通过连接的客户端分发 RabbitMQ 消息
问题描述
我有一个 RabbitMQ 服务器和一个消耗 RabbitMQ 队列的 Flask 服务器。消息由另一台服务器发布。客户端使用SocketIO连接到 Flask 服务器,并在连接时将会话 ID 存储在同步队列中。消息消费在一个单独的线程中完成。当消息到达时,回调函数获取队列中的第一项并将 SocketIO 消息发送到相应的客户端。
我试图实现这一切,但是当消息到达并触发回调函数时,队列是空的,而在消息到达之前,我将客户端连接到它并验证确实它的会话 ID 已放入队列。
我的猜测是消费者线程使用线程启动时为空的队列副本,并且在客户端连接后在 Flask 线程中更新队列后,此更改不会应用于消费者线程。
这是我的代码:
app = Flask(__name__)
socketio = SocketIO(app, ping_interval=5, async_mode='threading')
socketio.init_app(app, cors_allowed_origins="*")
client_queue = queue.Queue()
@socketio.on('connect')
def client_connected():
client_queue.put(request.sid, block=False)
def callback(ch, method, properties, body):
try:
selected_client = client_queue.get(block=True, timeout=5)
except queue.Empty as e:
print(e)
print("No clients")
connection = pika.BlockingConnection(pika.ConnectionParameters(host=os.getenv("RABBITMQ")))
channel = connection.channel()
channel.queue_declare(queue=os.getenv("QUEUE_NAME"), durable=True)
channel.basic_consume(queue=os.getenv("QUEUE_NAME"), on_message_callback=callback)
if __name__ == '__main__':
socketio.start_background_task(target=channel.start_consuming)
socketio.run(app, debug=True)
因此,当我运行此代码时,使用 SocketIO 客户端连接到服务器,然后通过 RabbitMQ 发送消息,这将导致空队列预期(不打印客户端)。我在这里做错了什么?
解决方案
推荐阅读
- javascript - 使用 Javascript 将页面导出为 PDF
- java - 扩展扩展另一个抽象类的抽象类时的构造函数问题
- networking - java - 如何使多个虚拟终端设备在使用java的虚拟网络udp通信中与协调器通信?
- gitlab - 排除 merge_request,在 gitlab CI 管道中推送创建作业
- json - 如何在我的逻辑应用程序中测试动态 json 表达式
- javascript - 使用 D3.js 在 SVG 内显示一个圆圈
- python - np.linalg.eigh() 的 Numpy 分段错误
- jmeter - 无法在 Apache JMeter 1.6 中打开插件管理器
- java - Java + Visual Studio 资源文件夹,以便我可以执行 getResourceAsStream(this.FILE_NAME)
- java - 为什么服务器和客户端不能一起启动?