首页 > 解决方案 > 将 paho MQTT 与 Python 中的另一个异步进程相结合

问题描述

我在 Python 中有一个基本的 MQTTLListener 类,它侦听有关某些主题的消息,并且应该启动或停止从另一个脚本导入的异步进程。这个过程永远运行,除非它被手动停止。让我们假设 Listener 看起来像这样:

import paho.mqtt.client as mqtt
import json
from python_file.py import async_forever_function

class MqttListener:

    def __init__(self, host, port, client_id):
        self.host = host
        self.port = port
        self.client_id = client_id
        self.client = mqtt.Client(client_id=self.client_id)
        self.client.connect(host=self.host, port=self.port)

    def on_connect(self, client, userdata, flags, rc):
        self.client.subscribe(topic=[("start", 1), ])
        self.client.subscribe(topic=[("stop", 1), ])
        logging.info(msg="MQTT - connected!")

    def on_disconnect(client, userdata, rc):
        logging.info(msg="MQTT - disconnected!")

    def on_message(self, client, userdata, message, ):
        print('PROCESSING MESSAGE', message.topic, message.payload.decode('utf-8'), )

        if message.topic == 'start':
            async_forever_function(param='start')
            print('process started')
        else:
            async_forever_function(param='stop')
            print('process removed')

    def start(self):
        self.client.on_connect = lambda client, userdata, flags, rc: self.on_connect(client, userdata, flags, rc)
        self.client.on_message = lambda client, userdata, message: self.on_message(client, userdata, message)
        self.client.on_disconnect = lambda client, userdata, rc: self.on_disconnect(client, userdata, rc)

        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

现在,这适用于启动一个新的异步进程。也就是说,当一条消息发布到启动 MQTT 主题时,会正确触发 async_function。然而,一旦这个异步进程启动,监听器将不再能够接收/处理来自停止 MQTT 主题的消息,并且异步进程将继续永远运行,而实际上它应该已经停止。

我的问题:如何调整此类的代码,以便在后台运行活动的异步进程时也可以处理消息?

标签: python-3.xasynchronousmqtt

解决方案


您不能在on_message()回调中执行阻塞任务。

此回调在 MQTT 客户端线程(由该loop_start()函数启动的线程)上运行。该线程处理所有网络流量和消息处理,如果您阻止它,则它无法执行任何操作。

如果你想从on_message()回调中调用长时间运行的任务,你需要为长时间运行的任务启动一个新线程,这样它就不会阻塞 MQTT 客户端循环。


推荐阅读