首页 > 解决方案 > 如何有效地为 MQTT 设置连接超时?

问题描述

如果我的程序无法连接到 MQTT 服务器,我想确保它会崩溃。为此,我设置ConnectTimeout为 10 秒,但是当连接到不存在的服务器(名称不存在)时,对 MQTT 的调用挂起

package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    timeout, _ := time.ParseDuration("10s");
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://this.does.not.resolve.example.coooom:1883")
    opts.SetClientID("monitor")
    opts.SetOrderMatters(false)
    opts.SetConnectRetry(true)
    opts.SetConnectTimeout(timeout)
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(fmt.Sprintf("cannot connect to MQTT: %v", token.Error()))
    }
}

如何正确设置超时?

标签: gotimeoutmqttconnection-timeout

解决方案


文档中SetConnectRetry

SetConnectRetry 设置连接函数是否会在失败的情况下自动重试连接(如果为 true,则 Connect 函数返回的令牌将在连接建立或取消之前不会完成)如果 ConnectRetry 为 true,则应在OnConnect 处理程序 将此设置为 TRUE 允许在建立连接之前发布消息。

所以什么时候SetConnectRetry(true) Connect()会重试,直到它成功或者你停止它。不会尝试确定错误是否是永久性的,因为这非常困难(例如,无法解决this.does.not.resolve.example.coooom可能是由于互联网连接中断)。

注意:有一个单独的选项SetAutoReconnect控制断开的连接(在初始连接成功建立后)是否会导致客户端自动重新建立连接。

值得注意的是,添加该选项的主要原因ConnectRetry是允许用户在连接建立之前发布消息(它们将在连接建立时自动发送)。这个想法是您应该能够调用Connect然后使用该库而不必担心网络状态(显然,如果网络关闭,您将不会收到消息!)。

就如何与以下内容进行交互而言,SetConnectTimeout文档可能并不像应有的那样清晰SetConnectRetry

SetConnectTimeout 限制客户端在尝试打开与 MQTT 服务器的连接时在超时之前等待的时间。持续时间为 0 永远不会超时。默认 30 秒。目前仅在 TCP/TLS 连接上运行。

因此,这控制了我们将等待单个连接尝试完成的时间。SetConnectRetry (true)每次尝试连接时都将使用此间隔(每次尝试重新开始)。没有RetryConnectionsFor类型设置。

来自自我回答:

...连接正确失败(并且,如果连接断开,希望客户端仍会重新连接)

NewClient(o *ClientOptions)一旦被调用,您就无法更改选项(NewClient获取选项的副本)。这样做是因为在操作进行时更改选项会导致不可预测的结果(例如,如果您在调用opts.SetConnectRetry(true)后调用,Connect那么结果将取决于Connect调用是否已完成,这是不可预测的)。

我不太确定“如果我的程序无法连接到 MQTT 服务器,请确保它崩溃”是什么意思,但我使用的方法类似于以下内容(替换fmt.PrintLn语句的适当操作) - 请注意,我有未编译/测试:

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "time"
)

func main() {
    timeout, _ := time.ParseDuration("10s")
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://this.does.not.resolve.example.coooom:1883")
    opts.SetClientID("monitor")
    opts.SetOrderMatters(false)
    opts.SetAutoReconnect(true).SetMaxReconnectInterval(10 * time.Second)
    opts.SetConnectRetry(true)
    opts.SetConnectTimeout(timeout)
    client := mqtt.NewClient(opts)

    token := client.Connect()

    go func(token mqtt.Token) {
    for {
        done := token.WaitTimeout(1 * time.Minute)
        if done {
            if token.Error() != nil {
                fmt.Println("Connection permanently failed (most probably due to call to Disconnect)", token.Error())
            } else {
                fmt.Println("Connection established")
            }
            return // We are done!

        }
        fmt.Println("Async MQTT Connection still trying (there could be an issue!)")
        // Can call `client.Disconnect()` to cancel connection if you want to cancel the connection attempts
    }
    }(token)

    // Do some stuff - you can publish messages and the library will send them when the connection comes up
}

推荐阅读