首页 > 解决方案 > 如何使用 .Net TPL DataFlow 或响应式扩展同时处理发布到 RabbitMQ 的动态消息?

问题描述

对于我的应用程序架构,我有一个 API,它根据收到的请求向 RabbitMQ 发布消息。Windows 服务充当 RabbitMQ 的消费者,它处理消息并更新数据库,或者在某些情况下使用 SignalR 回调。

一旦通过服务接收到消息,就会生成多个文件,几个数据库更新和处理,最后在数据准备好后调用外部 API。

我正在考虑使用 Semaphore Slim 库来并行处理消息并控制线程节流。但是当我阅读几篇文章时,我了解到 .NET TPL DataFlow 或 Reactive Extensions 可以很好地处理这种情况。但我很困惑,选择什么?

我在网上找到的所有指南和参考资料都讨论了处理一个已经可用并要执行的消息列表。但就我而言,消息是动态的,应该在调用 RabbitMQ .Net 客户端的 EventHanlder 委托时进行处理。

这是我使用消息的代码片段

var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                
                ProcessMessage(message);

                channel.BasicAck(ea.DeliveryTag, true);
            };

你能指导我这个场景是否可以完成,或者它是否会使我的整个模型过于复杂?

标签: c#parallel-processingrabbitmqsystem.reactivetpl-dataflow

解决方案


恕我直言,RX 提倡一种纯粹的/功能性的/无副作用的方法,而 TPL 数据流允许更自由地使用“肮脏”的解决方法。因此,尽管您可以使用其中任何一种来解决手头的问题,但使用 RX,您最终会得到一个更漂亮的解决方案,而使用 TPL Dataflow,您最终会得到一个更快的工作解决方案。此外,如果您对这些库一无所知,TPL Dataflow的学习曲线更流畅,其文档由 Microsoft 熟悉和维护,而RX有一个不熟悉的集中式文档,适用于所有平台和语言。


推荐阅读