python - mqtt客户端在线程和rest-api的情况下没有收到消息
问题描述
我有一个基于flask和mqtt的python脚本。用例是通过 rest-api 接收请求,然后创建一个新线程,该线程在 mosquitto mqtt 上发布一些消息并期望响应(请参阅订阅)。我的问题是我没有收到任何消息。我认为这与线程有关,因为没有线程它工作得很好。你知道可能是什么问题吗?
谢谢你的期待!
这里的代码:
from flask import Flask, Response
import paho.mqtt.client as mqtt
from threading import Thread
import threading
app = Flask(__name__)
lock = threading.Lock()
def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print("Connected with result code {0}".format(str(rc))) # Print result of connection attempt
client.subscribe("/mytopic")
def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
print(msg.topic)
client = mqtt.Client(client_id=client_name, clean_session=True)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
client.username_pw_set(mqtt_user, mqtt_password)
client.loop_start()
client.connect(mqtt_host)
def test(param1, param2):
lock.acquire()
try:
ret = client.publish("/mytopic", "")
while True:
check the response from mqtt => but i don't get the response anymore
....
break
finally:
lock.release()
return result
@app.route('/test/check', methods=['POST'])
def check():
global sessionId
sessionId = sessionId + 1
t = Thread(target=test, args=(sessionId,None))
t.start()
return {'id': sessionId, 'eta': 0}
if __name__ == '__main__':
app.run(debug=True)
解决方案
这有几个问题。
client.connect()
和调用都client.subscribe()
需要客户端网络循环的迭代才能正确完成。- 网络循环需要在建立连接后的每个保持活动期间至少运行一次,以阻止代理断开客户端的连接。这意味着如果在启动代码和第一个 REST 请求之间存在延迟,则客户端将断开连接。
最好使用该client.start_loop()
功能自行在后台连续运行 MQTT 客户端网络循环。
您还应该删除回调client.subscribe()
之外的on_connect()
调用。
编辑:正如评论/聊天中所讨论的那样,以下作品。看起来在调试模式下运行烧瓶应用程序会做一些奇怪的事情,并一遍又一遍地创建多个具有相同客户端 ID 的 MQTT 客户端。这导致代理不断地踢掉旧的,因此消息永远不会被传递。
from flask import Flask, Response
import paho.mqtt.client as mqtt
import time
from threading import Thread
import threading
app = Flask(__name__)
lock = threading.Lock()
sessionId=0
cont=True
def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print("Connected with result code {0}".format(str(rc))) # Print result of connection attempt
client.subscribe("mytopic")
def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
global cont
print(msg.topic)
cont=False
client = mqtt.Client(client_id="foo", clean_session=True)
client.on_connect = on_connect # Define callback function for successful connection
client.on_message = on_message # Define callback function for receipt of a message
#client.username_pw_set(mqtt_user, mqtt_password)
client.connect("localhost", port=1884)
client.loop_start()
def test(param1, param2):
lock.acquire()
try:
ret = client.publish("mytopic", "foo")
while cont:
time.sleep(5)
print("loop")
finally:
lock.release()
result = "foo"
return result
@app.route('/test/check', methods=['POST'])
def check():
global sessionId
sessionId = sessionId + 1
t = Thread(target=test, args=(sessionId,None))
t.start()
return {'id': sessionId, 'eta': 0}
if __name__ == '__main__':
print("started")
app.run()
推荐阅读
- r - 重命名列表中的序列数字数据框列
- r - 如何在ggplot中找到geom_curve控制点
- c# - 在 MacOS 与 ASP.Net Web API 上使用 curl 的 HTTP 标头的奇怪行为
- corda - Corda 附件流程
- javascript - Javascript:将一组键变成一个空对象?
- reactjs - React 为不正确的路径提供默认内容
- xamarin.android - Xamarin Android 应用程序在启动画面中挂起
- c - 为什么 ARM 将“0xFFFFFFFF”视为超出“int”范围的枚举而不是分配?
- python - Pandas DataFrame - 根据标准就地修改
- r - R - lubridate:将持续时间分成“子持续时间”