首页 > 解决方案 > 如何在 azure 服务总线主题接收器中处理取消令牌?

问题描述

我有一个场景,我正在调用 Azure 服务总线库的 SubscriptionClient 类的 RegisterMessageHandler。基本上,我使用基于触发器的方法,同时在 Service Fabric Environment 中的一项服务中接收来自 Service Bus 的消息作为无状态服务。因此,我不会立即关闭 subscriptionClient 对象,而是在服务的整个生命周期内将其保持打开状态,以便它继续接收来自 azure 服务总线主题的消息。

当服务需要关闭时(由于某些原因),我想处理传递给 Service Fabric 服务的取消令牌。

我的问题是如何处理 RegisterMessageHandler 方法中的取消令牌,每当收到新消息时都会调用该方法?我还想“优雅地”处理订阅客户端的关闭,即我希望如果一条消息已经在处理,那么我希望该消息得到完全处理,然后我想关闭连接。下面是我正在使用的代码。

目前我们正在遵循以下方法: 1.使用信号量锁锁定消息进程并在finally块中释放锁。2. 每当取消完成时,调用cancellationToken.Register 方法来处理取消令牌。释放注册方法中的锁。

public class AzureServiceBusReceiver
{
  private SubscriptionClient subscriptionClient;
  private static Semaphore semaphoreLock;

public AzureServiceBusReceiver(ServiceBusReceiverSettings settings)
{
    semaphoreLock = new Semaphore(1, 1);
    subscriptionClient = new SubscriptionClient(
        settings.ConnectionString, settings.TopicName, settings.SubscriptionName, ReceiveMode.PeekLock);
}

public void Receive(
    CancellationToken cancellationToken)
{
    var options = new MessageHandlerOptions(e =>
    {

        return Task.CompletedTask;
    })
    {
        AutoComplete = false,

    };

    subscriptionClient.RegisterMessageHandler(
        async (message, token) =>
        {
            semaphoreLock.WaitOne();
            if (subscriptionClient.IsClosedOrClosing)
                return;
            CancellationToken combinedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, token).Token;
            try
            {
                // message processing logic
            }
            catch (Exception ex)
            {
                await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken);
            }
            finally
            {
                semaphoreLock.Release();
            }
        }, options);


    cancellationToken.Register(() =>
    {
        semaphoreLock.WaitOne();
        if (!subscriptionClient.IsClosedOrClosing)
            subscriptionClient.CloseAsync().GetAwaiter().GetResult();
        semaphoreLock.Release();
        return;
    });
} 
}

标签: azureazure-service-fabricazureservicebusazure-servicebus-topicscancellationtokensource

解决方案


将消息客户端实现为ICommunicationListener,因此当服务关闭时,您可以阻塞调用,直到消息处理完成。不要使用静态信号量,这样您就可以在项目中安全地重用代码。

是一个如何做到这一点的例子。

这是由该代码创建的 Nuget 包

并随时贡献!


推荐阅读