首页 > 解决方案 > 是否从 azure 服务总线的 go 客户端库处理重试?

问题描述

我在https://github.com/Azure/azure-service-bus-go中引用客户端库。我能够编写一个简单的客户端来监听订阅并读取消息。如果我断开网络,几秒钟后我可以看到接收循环退出,并显示错误消息“上下文已取消”。我希望客户端库会做某种重试机制并处理任何连接问题或任何服务器超时等。我们需要自己处理吗?此外,我们是否得到任何可以识别并将其用于此重试机制的异常?任何示例代码将不胜感激。

下面是我尝试过的示例代码(仅包括重要部分)。

err = subscription.Receive(ctx, servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error {
        fmt.Println(string(message.Data))
        return message.Complete(ctx)
    }))

if err != nil {
    fmt.Println("FATAL: ", err)
    return
}

标签: azuregoazureservicebusazure-servicebus-topics

解决方案


不幸的是,较旧的库在重试方面做得不好,因此您需要将代码包装在自己的重试循环中。

func demoReceiveWithRecovery(ns *servicebus.Namespace) {
        parentCtx := context.TODO()

    for {
        q, err := ns.NewQueue(queueName)

        if err != nil {
            panic(err)
        }

        defer q.Close(parentCtx)

        handler := servicebus.HandlerFunc(func(c context.Context, m *servicebus.Message) error {
            log.Printf("Received message")
            return m.Complete(c)
        })

        err = q.Receive(parentCtx, handler)

        // check for potential recoverable situation
        if err != nil && errors.Is(err, context.Canceled) {
            // This workaround distinguishes between the handler cancelling because the
            // library has disconnected vs the parent context (passed in by you) being cancelled.
            if parentCtx.Err() != nil {
                // our parent context cancelled, which caused the entire operation to be canceled
                log.Printf("Cancelled by parent context")
                return
            } else {
                // cancelled internally due to an error, we can restart the queue
                log.Printf("Error occurred, restarting")
                _ = q.Close(parentCtx)
                continue
            }
        } else {
            log.Printf("Other error, closing client and restarting: %s", err.Error())
            _ = q.Close(parentCtx)
        }
    }
}

注意:此库最近已弃用,但在您发布问题后已弃用。如果您想尝试的话,新包就在这里 - azservicebus带有迁移指南,让事情变得更容易:migrationguide.md

新包裹应该在这种情况下正确恢复以便接收。我正在为发送方进行恢复#16695的新软件包中存在一个未解决的错误。

如果您想提出更多问题或有一些功能请求,您可以在https://github.com/Azure/azure-sdk-for-go的 github 问题中提交这些请求。


推荐阅读