首页 > 解决方案 > 我在我的服务总线队列中调用了一个长时间运行的进程。我希望它持续超过 5 分钟

问题描述

我有一个长时间运行的进程,它在数百万条记录之间执行匹配,我使用服务总线调用此代码,但是当我的进程超过 5 分钟限制时,Azure 再次从头开始处理已经处理的记录。

我怎样才能避免这种情况

这是我的代码:

private static async Task ProcessMessagesAsync(Message message, CancellationToken token)
 {
   long receivedMessageTrasactionId = 0;
   try
   {
     IQueueClient queueClient = new QueueClient(serviceBusConnectionString, serviceBusQueueName, ReceiveMode.PeekLock);

     // Process the message
     receivedMessageTrasactionId = Convert.ToInt64(Encoding.UTF8.GetString(message.Body));

     // My Very Long Running Method
     await DataCleanse.PerformDataCleanse(receivedMessageTrasactionId);
            //Get Transaction and Metric details

     await queueClient.CompleteAsync(message.SystemProperties.LockToken);
   }
   catch (Exception ex)
   {
     Log4NetErrorLogger(ex);
     throw ex;
   }
}

标签: c#azureparallel-processingazureservicebus

解决方案


消息用于通知而不是长时间运行的处理。

你有几个选项:

  1. 接收消息并依靠接收者的RenewLock()操作来延长锁。
  2. 使用用户回调 API 并通过MessageHandlerOptions.MaxAutoRenewDuration设置自动更新消息的锁定来指定最大处理时间(如果已知)。
  3. 记录处理开始但不完成传入消息。而是利用消息延迟功能,向自己发送一条新的延迟消息,并引用延迟消息SequenceNumber。这将允许您定期收到“提醒”消息以查看工作是否完成。如果是,则通过其完成延迟消息SequenceNumber。否则,在发送新消息的同时完成“提醒”消息。这种方法需要一定程度的架构重新设计。
  4. 与选项 3 类似,但将处理卸载到稍后将报告状态的外部进程。有一些框架可以帮助你。MassTransit 或 NServiceBus。后者有一个示例,您可以下载和使用。

请注意,选项 1 和 2不能保证,因为它们是客户端启动的操作。


推荐阅读