c# - 我在我的服务总线队列中调用了一个长时间运行的进程。我希望它持续超过 5 分钟
问题描述
我有一个长时间运行的进程,它在数百万条记录之间执行匹配,我使用服务总线调用此代码,但是当我的进程超过 5 分钟限制时,Azure 再次从头开始处理已经处理的记录。
我怎样才能避免这种情况
这是我的代码:
private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
long receivedMessageTrasactionId = 0;
try
{
IQueueClient queueClient = new QueueClient(serviceBusConnectionString, serviceBusQueueName, ReceiveMode.PeekLock);
// Process the message
receivedMessageTrasactionId = Convert.ToInt64(Encoding.UTF8.GetString(message.Body));
// My Very Long Running Method
await DataCleanse.PerformDataCleanse(receivedMessageTrasactionId);
//Get Transaction and Metric details
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
catch (Exception ex)
{
Log4NetErrorLogger(ex);
throw ex;
}
}
解决方案
消息用于通知而不是长时间运行的处理。
你有几个选项:
- 接收消息并依靠接收者的
RenewLock()
操作来延长锁。 - 使用用户回调 API 并通过
MessageHandlerOptions.MaxAutoRenewDuration
设置自动更新消息的锁定来指定最大处理时间(如果已知)。 - 记录处理开始但不完成传入消息。而是利用消息延迟功能,向自己发送一条新的延迟消息,并引用延迟消息
SequenceNumber
。这将允许您定期收到“提醒”消息以查看工作是否完成。如果是,则通过其完成延迟消息SequenceNumber
。否则,在发送新消息的同时完成“提醒”消息。这种方法需要一定程度的架构重新设计。 - 与选项 3 类似,但将处理卸载到稍后将报告状态的外部进程。有一些框架可以帮助你。MassTransit 或 NServiceBus。后者有一个示例,您可以下载和使用。
请注意,选项 1 和 2不能保证,因为它们是客户端启动的操作。
推荐阅读
- oracle - 我有一个要求,我必须在一周的每一天显示特定日期和时间之间的记录
- python - ValueError:使用序列设置数组元素。在 Keras 模型.fit
- assembly - 使用 PUSH {lr} 和 POP {pc} 从 ARM 中的 main/_start 函数返回
- javascript - 如何通过存储在索引处的嵌套对象属性对对象数组进行排序?
- python - 为什么这个函数返回的值与它打印的不同?
- php - 如何将 PHP 变量写为 HTML 属性值?
- python - 需要了解代码中 [::-1] 的用途是什么?
- svg - 如何获取里面一个角色的风格
- python - 如何在 Tensorflow 中创建自定义图像数据集
- python - 通过 pandas 操作访问特定组中的组