首页 > 解决方案 > 处理 SQS 项目队列的多线程方法

问题描述

在这个场景中,我必须从队列中轮询 AWS SQS 消息,每个异步请求最多可以获取 10 个 sqs 项目/消息。轮询项目后,我必须在 kubernetes pod 上处理这些项目。项目处理包括从几个 API 调用中获取响应,这可能需要一些时间然后将项目保存到 DB 和 S3。我做了一些研发并得出以下结论

  1. 要使用消费者生产者模型,1 个线程将轮询项目,另一个线程将处理项目或使用多线程进行项目处理
  2. 维护一个数据结构,其中包含准备处理的 sqs 轮询项目,DS 可以是阻塞收集或并发队列
  3. 使用任务并行库进行线程池和项目处理。
  4. 可以使用通道

我的查询

  1. 什么是实现最佳性能或提高 TPS 的最佳方法。
  2. 我可以/应该使用数据流 TPL
  3. 带异步任务的多线程或单线程

标签: c#multithreading.net-coretask-parallel-librarytpl-dataflow

解决方案


这在很大程度上取决于您的用例的具体情况以及您想要投入多少精力。

然而,我将解释我在做出这样的决定时会使用的思考过程。

处理 SQS 消息的简单解决方案是按顺序一次处理一个(即没有并发)。这并不意味着您一次只能接收一条消息,因为您可以向集群添加更多 pod。

因此,即使在那个幼稚的解决方案中,您也有一个可以利用的并发点,但它有很多开销。减少开销的方法通常是利用相同的开销,但用它处理更多的消息。这就是为什么,例如,SQS 允许您在一次调用中获得 1-10 条消息,而不仅仅是一条。它将调用开销分散到 10 条消息中。在简单的解决方案中,开销是启动整个过程的成本。将进程用于更多消息意味着并发处理。

我发现对于稳定和灵活的并发,您需要许多并发点,但每个点都以可配置的并行度为上限(无论是硬编码还是实际配置)。这样,您可以调整它们中的每一个以实现最佳输出(当您有空闲 CPU 和内存时增加,否则减少)。

那么,在哪里可以引入额外的并发呢?这是一个进步,每一步都可以更好地利用资源,但需要更多的努力。

  • 为每个 SQS API 调用获取 10 条消息而不是一条消息并同时处理它们。这样你就可以控制 2 个并发点:Pod 的数量、消息的数量(最多 10 条)并发。
  • 有几个任务,每个任务获取 1-10 个任务并同时处理它们。这是 3 个并发点:Pod、任务和每个任务的消息。这两种解决方案都受到处理时间不同的消息的影响,这意味着单个长时间运行的消息将“阻止”所有其他 1-9 个工作“槽”,从而有效地将并发性降低到低于配置的水平。
  • 设置一个 TPL 数据流块以同时处理消息,并设置一个(或少数几个)任务连续获取消息并泵入块中。请记住,需要显式删除 SQS 消息,因此块也需要接收消息句柄,以便在处理后可以删除消息。
  • TPL 数据流“管道”由几个块组成,每个块都有自己的并发度。当您有不同的消息处理步骤且每个步骤都有不同的限制(例如,具有不同限制配置的不同 API)时,这很有用。

我个人非常喜欢并且对 Dataflow 库感到满意,所以我会直接使用它。但是,当性能不是问题时,更简单的解决方案也是有效的。


推荐阅读