python - 处理迟到的 MQTT 消息
问题描述
我正在制作一个使用 MQTT 发送和接收数据的 Python 程序。发送设备通常是便携式的,并且可能没有可靠的连接,因此消息很可能迟到。
为了使程序正常运行,消息需要以某种及时的方式到达,比如 20 秒左右。如果消息晚于该时间到达,则需要进行一些特殊处理。
我的问题是确定消息是否迟到,如果迟到了多少。我想到的第一件事是发布者向订阅者可以检查的消息添加时间戳,但这取决于发布者和订阅者是否具有同步时钟。无法保证该程序的所有用户都将拥有准确的系统时间。也不一定保证具有管理员访问权限以更正系统时间。
另一个想法是使用 ntplib 或类似的东西在发布时应用来自 NTP 服务器的时间戳,但这也取决于拥有可靠的互联网连接,这是我首先要解决的问题。
是否有另一种方法可以在发布者和订阅者之间获得同步时间?确定我没有想到的消息延迟的另一种方法是什么?
更新
我的想法是在连接时从 NTP 服务器获取时间偏移,然后将该偏移应用于从 time.time() 生成的时间戳。例如:
import ntplib
time_offset = 0
ntp_client = ntplib.NTPClient()
def update_time_offset():
global time_offset
try:
resp = ntp_client.request('pool.ntp.org')
time_offset = resp.offset
logger.info(f"Time offset set to {time_offset}.")
return True
except:
logger.warn("Unable to update time offset.")
return update_time_offset()
def on_connect(client, userdata, flags, rc):
t = Thread(target=update_time_offset)
t.daemon = True
t.start()
发布功能:
def publish(message):
now = time.time()
timestamp = now + time_offset
payload = json.dumps({"message": message, "timestamp": timestamp})
mqtt_client.publish("topic_name", payload, qos=1)
print("Published to MQTT.")
以及订阅功能:
def _new_message(self, client, userdata, message):
payload = json.loads(message.payload.decode())
now_corrected = time.time() + time_offset
message_transit_time = round(now_corrected - payload["timestamp"], 1)
message_data = payload["message"]
stripped_message = " ".join(message_data.split())
print(f"Message {stripped_message} transit time: {message_transit_time}.")
这在理论上应该可行,但我遇到了以下问题:一些用户在启动时在树莓派等设备上运行程序。树莓派没有硬件时钟,因此系统时间在启动时不正确,直到被 ntpd 更新。我的程序在系统时间更新之前得到了时间偏移,导致运输时间非常不正确。我现在认为,与其给我的代码增加更多的复杂性,我会相信用户的系统时间是相对准确的,看看我是怎么做的。
解决方案
不
如果您需要知道消息是否需要超过 20 秒的时间才能传递,那么您别无选择,只能让所有设备都可以访问匹配的时间源。
标记消息创建时间的唯一方法是在消息中包含时间戳,这只能与接收设备的当前时间进行检查。
您可以将当前时间作为给定主题的保留消息发布,这样所有内容都具有大致共享的时间来源。如果在 QOS 0 完成,则不会排队,因此不应迟到
推荐阅读
- c# - "SqlCommandTimeout" value="120" SQL 异常
- apache-spark - Databricks - 如何确定分区数?
- firebase - 对等点之间的 Flutter-Firebase 文件查看/下载
- python - 列表 A 是否有任何不在列表 B 中的项目
- angular - 查找 OpenLayers 数组的索引
- javascript - 从 WKWebView 中的 URL 运行 Javascript
- python - 在 Python 中取 np.ndarray 的点积
- javascript - e.target.value 未定义
- ios - Xcode - “无法启动
”, 没有这样的文件或目录错误 - sql - 自加入:如何选择一组特定的数据