首页 > 解决方案 > 在生产者 - 消费者模式中寻找用于公平数据处理的理想 Azure 服务

问题描述

我正在尝试处理下图所示的情况。 在此处输入图像描述

我们有几个客户 = 生产者(还有 1 个客户 = 1 到 N 个生产者)。我们需要公平地处理来自每个生产者的数据(= 单独将是最好的选择),因为可能会发生一个生产者会向我们发送大量数据,如果所有生产者只有一个队列,其余的生产者将被阻止。所以我们需要确保没有生产者会被消费者公平地阻塞和服务。

理想的模式是每个生产者都有自己的队列和自己的消费者(异步等待将发送到队列的任何消息)。此架构由上图提供。

问题是生产者的数量会随着客户的数量而动态增长,所以我们也需要动态地创建队列和消费者。

此外,我们需要处理来自生产者的数据的另一个困难。我们需要确保生产者 A 的旧数据将在同一生产者 A 的新数据之前处理。(注意:生产者彼此独立,因此生产者 B 的新数据可以在生产者 A 的旧数据之前处理。)

根据我的研究,将 Azure Queue 存储用作 Queue 不会有任何问题(通过 动态创建队列不是问题queue.CreateIfNotExists();)。但我不知道作为合适的消费者使用什么。我知道有很多 Azure 服务,例如:Azure 函数、Azure WebJob、Azure 事件中心……等等。

我的问题是:这个用例作为消费者使用的最佳选择是什么?

我们需要尽可能公平地为队列提供服务,使任何生产者的队列都不会被其他人阻塞。

提前感谢您的任何提示!

更新

我再次考虑了这个用例,它产生了新的模式,见下图:

在此处输入图像描述

与之前的schema最大的不同就是Producer、Queue和Consumer之间没有1:1:1的关系。每个生产者不需要自己的队列和消费者。

将只有一个“主”队列,生产者将在其中发送元消息(“我已将批量 XY 发送到表存储 A”)。队列还会触发 WebJob,其主要任务是将信息发送到“服务总线/事件网格/事件中心”(我只是不确定哪个是最佳选择)。

服务总线/事件网格/事件中心将触发 Azure 功能,该功能将在那里执行“消费者”工作。它将从表存储中获取数据,进行一些转换并将其插入另一个结构。

WebJob 也会防止同时处理来自同一个生产者的两个微批次的情况。WebJob 将推迟其他批次,直到处理完最后一批。

实际上,不是服务总线/事件网格/事件中心可能只是 WebJob,它有一些线程池(消费者),它会唤醒每个生产者的消费者。尽管如此,我不认为这是扩展客户数量的最佳选择,因为 WebJob 的资源不是无限的。

最好的选择是上述结构之一(服务总线/事件中心/事件网格)。例如,每个生产者在服务总线中都有自己的主题,每个主题都会触发自己的 Azure 功能(这将是消费者)。

我想知道这是否是正确的方法?

标签: c#azureazure-devopsqueueproducer-consumer

解决方案


这是一个有趣的用例,但在我看来,Functions 将是消费者的最佳选择。我会将业务逻辑封装在一个单独的文件中,因此所有单独的函数都有队列绑定和对该单独文件的调用:

File 1:
public static Task DoStuff(string myQueueItem, ILogger log)
{
    //tranforms here

}

File 2 - n:

[FunctionName("ConsumerA")]
public static void QueueTrigger(
    [QueueTrigger("myqueue-items")] string myQueueItem,
    ILogger log)
{
    await DoStuff(myQueueItem, log);

}

除了队列创建/删除,您还可以使用管理 API来创建和删除函数。每次创建一个时,您都会调整名称和队列名称,使其绑定到新队列。

您可以通过更改 host.json文件中的“maxOutstandingRequests”属性来控制并行处理。在消费计划中,最大请求数将针对应用程序的每个实例 - 如果您将设置设置为 1 并且主机扩展到三个实例,则一次将处理三个消息。如果您使用的是标准应用服务计划,则缩放略有不同,但仍可以动态完成。消费者实例的动态扩展可能有助于处理不一致的数据量——大容量数据转储将被更快地处理,但当事情安静时,您不必有资源闲置。

当主机横向扩展时,每个实例都包含每个函数的副本,因此由于大量数据转储而增加吞吐量实际上会增加其他队列的可用吞吐量,而不是减少它。


推荐阅读