python-3.x - 将 paho MQTT 与 Python 中的另一个异步进程相结合
问题描述
我在 Python 中有一个基本的 MQTTLListener 类,它侦听有关某些主题的消息,并且应该启动或停止从另一个脚本导入的异步进程。这个过程永远运行,除非它被手动停止。让我们假设 Listener 看起来像这样:
import paho.mqtt.client as mqtt
import json
from python_file.py import async_forever_function
class MqttListener:
def __init__(self, host, port, client_id):
self.host = host
self.port = port
self.client_id = client_id
self.client = mqtt.Client(client_id=self.client_id)
self.client.connect(host=self.host, port=self.port)
def on_connect(self, client, userdata, flags, rc):
self.client.subscribe(topic=[("start", 1), ])
self.client.subscribe(topic=[("stop", 1), ])
logging.info(msg="MQTT - connected!")
def on_disconnect(client, userdata, rc):
logging.info(msg="MQTT - disconnected!")
def on_message(self, client, userdata, message, ):
print('PROCESSING MESSAGE', message.topic, message.payload.decode('utf-8'), )
if message.topic == 'start':
async_forever_function(param='start')
print('process started')
else:
async_forever_function(param='stop')
print('process removed')
def start(self):
self.client.on_connect = lambda client, userdata, flags, rc: self.on_connect(client, userdata, flags, rc)
self.client.on_message = lambda client, userdata, message: self.on_message(client, userdata, message)
self.client.on_disconnect = lambda client, userdata, rc: self.on_disconnect(client, userdata, rc)
self.client.loop_start()
def stop(self):
self.client.loop_stop()
现在,这适用于启动一个新的异步进程。也就是说,当一条消息发布到启动 MQTT 主题时,会正确触发 async_function。然而,一旦这个异步进程启动,监听器将不再能够接收/处理来自停止 MQTT 主题的消息,并且异步进程将继续永远运行,而实际上它应该已经停止。
我的问题:如何调整此类的代码,以便在后台运行活动的异步进程时也可以处理消息?
解决方案
您不能在on_message()
回调中执行阻塞任务。
此回调在 MQTT 客户端线程(由该loop_start()
函数启动的线程)上运行。该线程处理所有网络流量和消息处理,如果您阻止它,则它无法执行任何操作。
如果你想从on_message()
回调中调用长时间运行的任务,你需要为长时间运行的任务启动一个新线程,这样它就不会阻塞 MQTT 客户端循环。
推荐阅读
- python - 合并多个 Pandas Dataframe 对象
- excel - Excel 中的范围与 Google 表格中的范围
- xamarin - 为什么在尝试将 Xamarin 应用程序部署到本地 iPhone 时突然出现“运行热重启时出错”?
- c# - 互斥锁在 C# 中发布时不工作,但调试工作正常
- go - 小端或大端:具有相同代码的二进制编码
- ios - 解雇不在两个 Swift UI 视图之一中工作
- python - 转换大写/小写的列表元素,然后在python中删除重复项
- flutter - Flutter:即使在设置提供程序后也获得空值
- flutter - Dart Isolate 利用 Timer 周期不会停止
- python - 如何使用 Python 查找值序列