首页 > 解决方案 > Paho MQTT - 日志显示消息已发布但未发送任何内容

问题描述

我真的很难让这个基本概念发挥作用,我可能在这个问题上花了 15 个小时,寻找答案的时间充满了无用的矛盾信息。

我有一个订阅 MQTT 主题的客户端进程,它会根据收到的消息类型做出反应。然后我想将数据发送回岸边流程。

我尝试过永远循环,开始循环,不循环,打开和打开,但数据没有出现在 MQTT Explorer 中(1883 上的本地端口转发以检查实时提要)

代码

主要的

import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')

def main():
    client = mqtt.Client()
    client.tls_set(-setup tls certs-)
    client.on_connect = mqtt_act.on_connect
    client.on_message = mqtt_act.on_message
    client.on_log = mqtt_act.on_log
    client.connect(-connect to broker-)
    client.loop_forever()
    try:
     while True:
      time.sleep(1)
    except KeyboardInterrupt:
        client.loop_stop()

if __name__ == "__main__":
    main()

MQTT 操作

import json
import logging
import time
import subprocess 
logger = logging.getLogger()
logger.setLevel(logging.INFO)    

def send_ack(client, _id):
    client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
    logging.info("send ack")
    client.loop()

def on_log(client,userdata,level,buff):
    print(buff)

def on_connect(client, userdata, flags, rc):
        logging.info("Connected with return code: "+str(rc))
        # get some sort of identifier
        _id = get_id()
        client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
        _ = json.loads(msg.payload.decode())
        logging.info("Received message from master")
        if _['action'] == "test123":
            _id = _['id']
            logging.info("ID: "+str(_id))
            send_ack(client, _id)
            outcome = get_data()
            outcome['id'] = str(_id)
            try:
             client.publish("a/topic/endpoint/"+str(_id), str(outcome))
             client.loop()
            except Exception as e:
                logging.info(str(e))

输出

host:~$:/tmp/client_test# python3 listen.py 
Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
Received CONNACK (0, 0)
Sending SUBSCRIBE (d0, m1) [(b'a/topic/222/#', 0)]
Received SUBACK
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ...  (89 bytes)
Sending PUBLISH (d0, q0, r0, m2), 'b'a/topic/endpoint/state_change'', ... (122 bytes)
Sending PUBLISH (d0, q0, r0, m3), 'b'a/topic/endpoint/64cb76fa-791b-11eb-bde4-005056ae7e22'', ... (2596 bytes)
Received PUBLISH (d0, q0, r0, m0), 'a/topic/222/test123', ...  (89 bytes)

MQTT Explorer 中显示的内容

标签: pythonpython-3.xmqttpaho

解决方案


所以我接受了给出的建议并设置了线程,我也将其更改loop_forever()loop_start()

主要的

import os
import paho.mqtt.client as mqtt
import logging
import time
import lib.mqtt.mqtt_actions as mqtt_act
logging.basicConfig(level=logging.INFO, filename='/data/client.log', filemode='a+',format='%(asctime)s - %(name)s : %(levelname)s - %(message)s')

def main():
    client = mqtt.Client()
    client.tls_set(-setup tls certs-)
    client.on_connect = mqtt_act.on_connect
    client.on_message = mqtt_act.on_message
    client.on_log = mqtt_act.on_log
    client.connect(-connect to broker-)
    client.loop_start()
    try:
     while True:
      time.sleep(1)
    except KeyboardInterrupt:
        client.loop_stop()

if __name__ == "__main__":
    main()

MQTT

import json
import logging
import time
import threading
import subprocess 
logger = logging.getLogger()
logger.setLevel(logging.INFO)    

def send_ack(client, _id):
    client.publish('a/topic/state_change', payload='{"id": "'+str(_id)+'","status": "START", "code": "003", "timestamp": "'+str(datetime.datetime.now())+'"}')
    logging.info("send ack")

def on_log(client,userdata,level,buff):
    print(buff)

def on_connect(client, userdata, flags, rc):
        logging.info("Connected with return code: "+str(rc))
        # get some sort of identifier
        _id = get_id()
        client.subscribe('a/topic/'+_id+'/#')
def on_message(client, userdata, msg):
        _ = json.loads(msg.payload.decode())
        logging.info("Received message from master")
        if _['action'] == "test123":
            _id = _['id']
            logging.info("ID: "+str(_id))
            send_ack(client, _id)
            t1 = threading.Thread(target=start_test, args=(_id, 5,client,), name='test_thread')
            t1.start()

def start_test(ID, int_a, client):
    outcome = get_data()
    outcome['id'] = ID
    data = json.dumps(outcome)
    try:
        client.publish("a/test/endpoint/"+str(outcome['id']), data)
    except Exception as e:
        logging.info(str(e))

推荐阅读