azure - 如何在 azure 服务总线主题接收器中处理取消令牌?
问题描述
我有一个场景,我正在调用 Azure 服务总线库的 SubscriptionClient 类的 RegisterMessageHandler。基本上,我使用基于触发器的方法,同时在 Service Fabric Environment 中的一项服务中接收来自 Service Bus 的消息作为无状态服务。因此,我不会立即关闭 subscriptionClient 对象,而是在服务的整个生命周期内将其保持打开状态,以便它继续接收来自 azure 服务总线主题的消息。
当服务需要关闭时(由于某些原因),我想处理传递给 Service Fabric 服务的取消令牌。
我的问题是如何处理 RegisterMessageHandler 方法中的取消令牌,每当收到新消息时都会调用该方法?我还想“优雅地”处理订阅客户端的关闭,即我希望如果一条消息已经在处理,那么我希望该消息得到完全处理,然后我想关闭连接。下面是我正在使用的代码。
目前我们正在遵循以下方法: 1.使用信号量锁锁定消息进程并在finally块中释放锁。2. 每当取消完成时,调用cancellationToken.Register 方法来处理取消令牌。释放注册方法中的锁。
public class AzureServiceBusReceiver
{
private SubscriptionClient subscriptionClient;
private static Semaphore semaphoreLock;
public AzureServiceBusReceiver(ServiceBusReceiverSettings settings)
{
semaphoreLock = new Semaphore(1, 1);
subscriptionClient = new SubscriptionClient(
settings.ConnectionString, settings.TopicName, settings.SubscriptionName, ReceiveMode.PeekLock);
}
public void Receive(
CancellationToken cancellationToken)
{
var options = new MessageHandlerOptions(e =>
{
return Task.CompletedTask;
})
{
AutoComplete = false,
};
subscriptionClient.RegisterMessageHandler(
async (message, token) =>
{
semaphoreLock.WaitOne();
if (subscriptionClient.IsClosedOrClosing)
return;
CancellationToken combinedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, token).Token;
try
{
// message processing logic
}
catch (Exception ex)
{
await subscriptionClient.DeadLetterAsync(message.SystemProperties.LockToken);
}
finally
{
semaphoreLock.Release();
}
}, options);
cancellationToken.Register(() =>
{
semaphoreLock.WaitOne();
if (!subscriptionClient.IsClosedOrClosing)
subscriptionClient.CloseAsync().GetAwaiter().GetResult();
semaphoreLock.Release();
return;
});
}
}
解决方案
推荐阅读
- python - QScrollArea.ensureWidgetVisible 方法不显示目标小部件
- javascript - 是否可以构造一个对象,以便在请求其键时引发错误?
- node.js - Monk - 查询后数据去哪了?
- malloc - STM32 HAL_UART_Transmit 动态字符串
- sql-server - 如何将一个表中的行结果映射为查询中另一个表的标题
- oracle - Salesforce Oracle 集成
- android - Kotlin 不允许多重继承。我怎样才能做等效的事情?
- assembly - x86 程序集的语法检查
- java - 如何允许站点获取访问其 URL 的机器的 IP
- javascript - 包括 Protobuf 返回“exports”是只读的