python - Paho MQTT - 日志显示消息已发布但未发送任何内容
问题描述
我真的很难让这个基本概念发挥作用,我可能在这个问题上花了 15 个小时,寻找答案的时间充满了无用的矛盾信息。
我有一个订阅 MQTT 主题的客户端进程,它会根据收到的消息类型做出反应。然后我想将数据发送回岸边流程。
我尝试过永远循环,开始循环,不循环,打开和打开,但数据没有出现在 MQTT Explorer 中(1883 上的本地端口转发以检查实时提要)
代码
主要的
import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')
def main():
client = mqtt.Client()
client.tls_set(-setup tls certs-)
client.on_connect = mqtt_act.on_connect
client.on_message = mqtt_act.on_message
client.on_log = mqtt_act.on_log
client.connect(-connect to broker-)
client.loop_forever()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
if __name__ == "__main__":
main()
MQTT 操作
import json
import logging
import time
import subprocess
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def send_ack(client, _id):
client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
logging.info("send ack")
client.loop()
def on_log(client,userdata,level,buff):
print(buff)
def on_connect(client, userdata, flags, rc):
logging.info("Connected with return code: "+str(rc))
# get some sort of identifier
_id = get_id()
client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
_ = json.loads(msg.payload.decode())
logging.info("Received message from master")
if _['action'] == "test123":
_id = _['id']
logging.info("ID: "+str(_id))
send_ack(client, _id)
outcome = get_data()
outcome['id'] = str(_id)
try:
client.publish("a/topic/endpoint/"+str(_id), str(outcome))
client.loop()
except Exception as e:
logging.info(str(e))
输出
host:~$:/tmp/client_test# python3 listen.py
Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Received CONNACK (0, 0)
Sending SUBSCRIBE (d0, m1) [(b'a/topic/222/#', 0)]
Received SUBACK
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ... (89 bytes)
Sending PUBLISH (d0, q0, r0, m2), 'b'a/topic/endpoint/state_change'', ... (122 bytes)
Sending PUBLISH (d0, q0, r0, m3), 'b'a/topic/endpoint/64cb76fa-791b-11eb-bde4-005056ae7e22'', ... (2596 bytes)
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ... (89 bytes)
MQTT Explorer 中显示的内容
- state_change
- 测试123
解决方案
所以我接受了给出的建议并设置了线程,我也将其更改loop_forever()
为loop_start()
主要的
import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')
def main():
client = mqtt.Client()
client.tls_set(-setup tls certs-)
client.on_connect = mqtt_act.on_connect
client.on_message = mqtt_act.on_message
client.on_log = mqtt_act.on_log
client.connect(-connect to broker-)
client.loop_start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
if __name__ == "__main__":
main()
MQTT
import json
import logging
import time
import threading
import subprocess
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def send_ack(client, _id):
client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
logging.info("send ack")
def on_log(client,userdata,level,buff):
print(buff)
def on_connect(client, userdata, flags, rc):
logging.info("Connected with return code: "+str(rc))
# get some sort of identifier
_id = get_id()
client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
_ = json.loads(msg.payload.decode())
logging.info("Received message from master")
if _['action'] == "test123":
_id = _['id']
logging.info("ID: "+str(_id))
send_ack(client, _id)
t1 = threading.Thread(target=start_test, args=(_id, 5,client,), name='test_thread')
t1.start()
def start_test(ID, int_a, client):
outcome = get_data()
outcome['id'] = ID
data = json.dumps(outcome)
try:
client.publish("a/test/endpoint/"+str(outcome['id']), data)
except Exception as e:
logging.info(str(e))
推荐阅读
- android - 如何杀死 Kotlin 中的协程?
- arrays - 使用 InstanceID 禁用/启用 PnP 设备的脚本 - 但重新启动时 ID 会更改
- python - C 和 Python 中的浮点数之间的差异
- python - 如果字符串中存在子字符串并与不区分大小写的python匹配,则返回子字符串
- python - 嵌套for循环仅在Python中网格的第一行上运行
- android - 在 Android 12 上应多久更新一次大致位置
- python-3.x - 如何从列中提取与列表匹配的单词?
- javascript - 新实例字段语法
- c# - C# 正则表达式 - 似乎无法始终如一地工作
- angular - Zuul 没有路由到 Angular 资源