python - Python Azure 服务总线锁已过期
问题描述
我正在使用 azure 服务总线,我的锁已过期。
我如何实现锁定到 1 天,但我仍然收到错误
我的代码:
import json
from typing import Any, Dict, Generator, List, Optional
from azure.servicebus import (AutoLockRenewer, ServiceBusClient,
ServiceBusMessage)
class Queue():
def __init__(
self,
connection_string: Optional[str] = None,
queue_name: Optional[str] = None,
max_lock_renewal_duration: Optional[int] = 86400,
logging_enable: Optional[bool] = True
):
self.queue_name = queue_name
self.connection_string = connection_string
self.logging_enable = logging_enable
self.max_lock_renewal_duration = max_lock_renewal_duration
self.client = ServiceBusClient.from_connection_string(
conn_str=self.connection_string, logging_enable=logging_enable)
self.sender = self.client.get_queue_sender(queue_name=self.queue_name)
self.renewer = AutoLockRenewer(
max_lock_renewal_duration=max_lock_renewal_duration)
self.receiver = self.client.get_queue_receiver(
queue_name=self.queue_name, auto_lock_renewer=self.renewer)
def get_msgs(
self,
max_message_count: Optional[int] = 1,
max_wait_time: Optional[int] = 5
) -> Generator[ServiceBusMessage, None, None]:
for msg in self.receiver.receive_messages(
max_message_count=max_message_count,
max_wait_time=max_wait_time
):
yield msg
def abandon_msg(self, msg: ServiceBusMessage) -> None:
self.receiver.abandon_message(msg)
q = Queue(connection_string="secret",
queue_name="my_queue",
logging_enable=True
)
for msg in q.get_msgs(max_message_count=100, max_wait_time=5):
print("process my messages")
print("some reason abondan")
错误:
--- 记录错误 --- 回溯(最后一次调用):
文件“/test/.env/lib64/python3.6/site-packages/azure/servicebus/_servicebus_receiver.py”,第 782 行,在放弃消息 self._settle_message_with_retry(message, MESSAGE_ABANDON) 文件“/test/.env/lib64/ python3.6/site-packages/azure/servicebus/_servicebus_receiver.py",第 415 行,在 _settle_message_with_retry 错误=message.auto_renew_error,azure.servicebus.exceptions.ServiceBusError:消息锁上的锁已过期。
我的目标是放弃消息,以便它重试,还有一种方法可以增加重试时间,而不是像每个放弃消息的延迟 300 秒那样立即排队?
解决方案
默认情况下,锁定时间将设置为 30 秒,我们可以将其增加到 5 分钟。
将锁定期设置为 1 天将被控制台本身拒绝,如下图所示:
更新到最大锁定时间后的屏幕截图如下:
但是,锁可以更新,直到函数完成。因为我们可以使用自动锁更新功能,您可以在其中指定要继续更新锁的持续时间。
下面的代码将帮助我们自动更新会话锁定100 秒
renewer.register(receiver, receiver.session, max_lock_renewal_duration=100)
print('Register session into AutoLockRenewer.')
received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
time.sleep(100) # message handling for long period (E.g. application logic)
for msg in received_msgs:
receiver.complete_message(msg)
要了解有关每个参数的更多信息,请参阅azure -sdk-for-python 中的 auto_lock_renew.py。
还要从这里检查_auto_lock_renew_task的定义 。
建议:尝试增加Lock Period,减少renewlockperiod。
推荐阅读
- javascript - 使用 Puppeteer 和 Node 从 DOM 中选择元素
- java - 使用 System.in.read() 进行多次迭代
- asp.net-core - ASP.net Core HTTP.sys 不适用于 https
- javascript - 如何创建新的 EventEmitter 并注册“打开”和“关闭”事件监听器?
- html - 下拉菜单不会与 CSS 一起显示
- python - LogisticRegression 中的标签索引需要 Pyspark 澄清
- ansible - 如何处理 ansible 库存文件的 ansible_ssh_pass 变量中的特殊字符?
- python - 您对二进制时间序列的信号处理有什么建议吗?
- python - 我可以为 numpy 预计算/预生成伪随机数吗?
- database - Flutter中可以存储列表的数据库