首页 > 解决方案 > Python Azure 服务总线锁已过期

问题描述

我正在使用 azure 服务总线,我的锁已过期。

我如何实现锁定到 1 天,但我仍然收到错误

我的代码:

import json
from typing import Any, Dict, Generator, List, Optional


from azure.servicebus import (AutoLockRenewer, ServiceBusClient,
                              ServiceBusMessage)


class Queue():
    def __init__(
                self,
                connection_string: Optional[str] = None,
                queue_name: Optional[str] = None,
                max_lock_renewal_duration: Optional[int] = 86400,
                logging_enable: Optional[bool] = True
            ):
        

        self.queue_name = queue_name
        self.connection_string = connection_string
        self.logging_enable = logging_enable
        self.max_lock_renewal_duration = max_lock_renewal_duration
        self.client = ServiceBusClient.from_connection_string(
            conn_str=self.connection_string, logging_enable=logging_enable)
        self.sender = self.client.get_queue_sender(queue_name=self.queue_name)
        self.renewer = AutoLockRenewer(
                max_lock_renewal_duration=max_lock_renewal_duration)
        self.receiver = self.client.get_queue_receiver(
            queue_name=self.queue_name, auto_lock_renewer=self.renewer)

 
    def get_msgs(
                self,
                max_message_count: Optional[int] = 1,
                max_wait_time: Optional[int] = 5
            ) -> Generator[ServiceBusMessage, None, None]:
        

        for msg in self.receiver.receive_messages(
                    max_message_count=max_message_count,
                    max_wait_time=max_wait_time
                ):
            yield msg

     def abandon_msg(self, msg: ServiceBusMessage) -> None:
    

          self.receiver.abandon_message(msg)

q = Queue(connection_string="secret",
          queue_name="my_queue",
          logging_enable=True
        )
for msg in q.get_msgs(max_message_count=100, max_wait_time=5):
    print("process my messages")
    print("some reason abondan")

错误:

--- 记录错误 --- 回溯(最后一次调用):

文件“/test/.env/lib64/python3.6/site-packages/azure/servicebus/_servicebus_receiver.py”,第 782 行,在放弃消息 self._settle_message_with_retry(message, MESSAGE_ABANDON) 文件“/test/.env/lib64/ python3.6/site-packages/azure/servicebus/_servicebus_receiver.py",第 415 行,在 _settle_message_with_retry 错误=message.auto_renew_error,azure.servicebus.exceptions.ServiceBusError:消息锁上的锁已过期。

我的目标是放弃消息,以便它重试,还有一种方法可以增加重试时间,而不是像每个放弃消息的延迟 300 秒那样立即排队?

标签: pythonpython-3.xazureazureservicebusazure-servicebus-queues

解决方案


默认情况下,锁定时间将设置为 30 秒,我们可以将其增加到 5 分钟。

将锁定期设置为 1 天将被控制台本身拒绝,如下图所示:

在此处输入图像描述

更新到最大锁定时间后的屏幕截图如下:

在此处输入图像描述

但是,锁可以更新,直到函数完成。因为我们可以使用自动锁更新功能,您可以在其中指定要继续更新锁的持续时间。

下面的代码将帮助我们自动更新会话锁定100 秒

renewer.register(receiver, receiver.session, max_lock_renewal_duration=100)
print('Register session into AutoLockRenewer.')

received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
time.sleep(100)  # message handling for long period (E.g. application logic)
for msg in received_msgs:
    receiver.complete_message(msg)

要了解有关每个参数的更多信息,请参阅azure -sdk-for-python 中的 auto_lock_renew.py。

还要从这里检查_auto_lock_renew_task的定义 。

建议:尝试增加Lock Period,减少renewlockperiod。


推荐阅读