首页 > 解决方案 > mqtt客户端在线程和rest-api的情况下没有收到消息

问题描述

我有一个基于flask和mqtt的python脚本。用例是通过 rest-api 接收请求,然后创建一个新线程,该线程在 mosquitto mqtt 上发布一些消息并期望响应(请参阅订阅)。我的问题是我没有收到任何消息。我认为这与线程有关,因为没有线程它工作得很好。你知道可能是什么问题吗?

谢谢你的期待!

这里的代码:

from flask import Flask, Response
import paho.mqtt.client as mqtt
from threading import Thread
import threading

app = Flask(__name__)
lock = threading.Lock()

def on_connect(client, userdata, flags, rc):  # The callback for when the client connects to the broker
    print("Connected with result code {0}".format(str(rc)))  # Print result of connection attempt
    client.subscribe("/mytopic")


def on_message(client, userdata, msg):  # The callback for when a PUBLISH message is received from the server.
    print(msg.topic)


client = mqtt.Client(client_id=client_name, clean_session=True)
client.on_connect = on_connect  # Define callback function for successful connection
client.on_message = on_message  # Define callback function for receipt of a message
client.username_pw_set(mqtt_user, mqtt_password)
client.loop_start()
client.connect(mqtt_host)    


def test(param1, param2):
   lock.acquire()
   try:

      ret = client.publish("/mytopic", "")
      while True:
            check the response from mqtt => but i don't get the response anymore
            ....
            break
    finally:
        lock.release()
    return result


@app.route('/test/check', methods=['POST'])
def check():
    global sessionId
    sessionId = sessionId + 1
    t = Thread(target=test, args=(sessionId,None))
    t.start()
    return {'id': sessionId, 'eta': 0}


if __name__ == '__main__':
    app.run(debug=True)

标签: pythonmultithreadingflaskmqtt

解决方案


这有几个问题。

  1. client.connect()和调用都client.subscribe()需要客户端网络循环的迭代才能正确完成。
  2. 网络循环需要在建立连接后的每个保持活动期间至少运行一次,以阻止代理断开客户端的连接。这意味着如果在启动代码和第一个 REST 请求之间存在延迟,则客户端将断开连接。

最好使用该client.start_loop()功能自行在后台连续运行 MQTT 客户端网络循环。

您还应该删除回调client.subscribe()之外的on_connect()调用。

编辑:正如评论/聊天中所讨论的那样,以下作品。看起来在调试模式下运行烧瓶应用程序会做一些奇怪的事情,并一遍又一遍地创建多个具有相同客户端 ID 的 MQTT 客户端。这导致代理不断地踢掉旧的,因此消息永远不会被传递。

from flask import Flask, Response
import paho.mqtt.client as mqtt
import time
from threading import Thread
import threading

app = Flask(__name__)
lock = threading.Lock()
sessionId=0
cont=True

def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print("Connected with result code {0}".format(str(rc))) # Print result of connection attempt
client.subscribe("mytopic")


def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
global cont
print(msg.topic)
cont=False


client = mqtt.Client(client_id="foo", clean_session=True)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
#client.username_pw_set(mqtt_user, mqtt_password)
client.connect("localhost", port=1884)
client.loop_start()

def test(param1, param2):
lock.acquire()
try:
ret = client.publish("mytopic", "foo")
while cont:
time.sleep(5)
print("loop")
finally:
lock.release()

result = "foo"

return result


@app.route('/test/check', methods=['POST'])
def check():
global sessionId
sessionId = sessionId + 1
t = Thread(target=test, args=(sessionId,None))
t.start()
return {'id': sessionId, 'eta': 0}


if __name__ == '__main__':
print("started")
app.run()

推荐阅读