首页 > 解决方案 > MassTransit 消息处理并发性

问题描述

我正在尝试使用 MassTransit 解决问题,但我找不到正确的方法。我将尝试更详细地解释该场景。我们的系统能够导入大量数据(记录)。一方面是 API 门面。Facade 使用 MassTransit 将大量特定消息生成到系统中。处理消息可能意味着更多的事情。例如(创建记录、更新记录、使用记录进行业务操作……)。对于某些场景,例如更新,必须实现并发。这意味着,如果有更多使用特定 RecordID 的更新消息,则不允许处理的消息多于一条!使用这种方法,我们希望避免数据库端的事务锁定。使用消息分区,我们解决了并发问题。缺点是,

标签: concurrencyrabbitmqmasstransit

解决方案


我认为您需要记住,即使您完全删除并发,使用队列的发布订阅也不能保证有序处理。例如,RMQ 集群可以进入分区状态,然后恢复,然后您可能会收到一条消息两次,而第二条消息可能比预期晚得多。此外,大多数消息代理为您提供“至少一次”传递,这意味着您可以多次收到一条消息。考虑到这一点,确保您的消息处理消费者能够确保幂等性是一个好主意。

使用 MassTransit,您只能通过使用分区过滤器来缓解问题。在文档中,您只能找到Saga Guidelines中提到的分区器,但它是相同的概念 - 避免并发处理特定字段中具有相同值的消息。

但是,分区只发生在服务实例中,这意味着如果您有多个运行相同服务的实例(竞争消费者),分区将不起作用。

如果您想要线性化和顺序的消息处理,则需要使用事件日志样式的基础架构。由于它不完全适合核心 MassTransit 模型,因此支持Riders。在那里,您可以在 Kafka 的 Azure 事件中心之间进行选择。请记住,这些不是代理,而是事件日志,因此尽管您可以通过使用多个分区来提高整体性能,但每个分区的消费者独立运行。如果某个消费者无法处理来自其分区的消息,则该分区将完全停止处理消息。当然,您需要使用那些要线性化的 id 作为分区键。


推荐阅读