首页 > 解决方案 > Azure 服务总线 - 将参数传递给消息处理程序

问题描述

在 Azure 服务总线中,我需要同时监听来自不同服务总线的多个订阅的消息。
为此,我创建了一个列表,其中包含带有连接字符串、主题、订阅名称和一些其他信息的对象(该列表称为“作业”)。
然后,对于此列表中的每个项目,我将创建一个不同的任务来创建 ServiceBusClient 和处理器。

            var jobs = GetAllServiceBusTopics();
            Parallel.ForEach(jobs, async job =>
            {
                var client = new ServiceBusClient(job.Environment.ServiceBusConnectionString);
                var options = new ServiceBusProcessorOptions();
                var processor = client.CreateProcessor(job.Environment.TopicName, _subscriptionName, new ServiceBusProcessorOptions());

                try
                {
                    processor.ProcessMessageAsync += MessageHandler;
                    //Pass the job object somehow to the "MessageHandler" below.
                    processor.ProcessErrorAsync += ErrorHandler;
                    await processor.StartProcessingAsync();

                    Console.WriteLine("Wait for a minute and then press any key to end the processing");
                    Console.ReadKey();

                    Console.WriteLine("\nStopping the receiver...");
                    await processor.StopProcessingAsync();
                    Console.WriteLine("Stopped receiving messages");
                }
                finally
                {
                    await processor.DisposeAsync();
                    await client.DisposeAsync();
                }
            });

如果有新消息到达,则调用的处理程序:

    static async Task MessageHandler(ProcessMessageEventArgs args)
    {
         //I need the "job" object from my loop above here.
    }

我在微软的这个网站上了解到这个概念的一般工作原理。

我的第一个问题:

  1. 这种方法可以吗,还是我跑错了方向?我可以这样做吗?

但即使这样没问题,我还有另一个更重要的任务:

我需要以某种方式将循环中的“作业”对象作为参数传递给消息处理程序。
但我目前不知道如何归档这个。对此有何建议?

标签: c#azureservicebusazure-servicebus-topics

解决方案


这种方法可以吗,还是我跑错了方向?我可以这样做吗?

是的,你可以这样做。要记住的一件事是您实例化了多个ServiceBusClient实例,每个实例都会导致建立一个新连接,而不是使用相同的连接。我不知道主题(工作)的数量可能有多大,但如果它很大,你最终会遇到连接饥饿。

我需要以某种方式将循环中的“作业”对象作为参数传递给消息处理程序。但我目前不知道如何归档这个。对此有何建议?

这不是ServiceBusProcessor设计的方式。除了需要处理的传入消息之外,它不接收任何其他内容。如果您需要一个作业 ID,那应该是消息有效负载/元数据的一部分。如果您需要知道它来自哪个实体,您可以添加订阅过滤器操作以添加带有标识符的自定义标头。另一种方法需要包装ServiceBusProcessor以保留作业 ID/订阅标识符并在事件处理程序中使用它。


推荐阅读