首页 > 解决方案 > RabbitMQ + MassTransit:如何从处理中取消排队的消息?

问题描述

在某些特殊情况下,我需要以某种方式在接收点告诉消费者某些消息不应该被处理。否则两个系统将变得不同步(我们处理一些过时的外部系统,例如,如果连接断开,我们必须丢弃该连接范围内的所有排队操作)。

冒险并手动解决问题消息?补偿行动(在我的情况下可能很难支持)?还要别的吗?

标签: rabbitmqnservicebusmasstransitmessage-bus

解决方案


有几种方法:

  • 您可以在发送消息时设置生存时间:await endpoint.Send(myMessage, c => c.TimeToLive = TimeSpan.FromHours(1));,但这将适用于像这样发送(或发布)的所有消息。在查看您的要求后,我会考虑这一点。这是技术性的,但它是一种正确的消息传递模式。

  • 为您的消息本身设置 TTL 和生成时间戳属性,并让消费者决定该消息是否仍然值得处理。这是更多的业务,并且可能是最正确的方法。

  • 结合技术和业务 - 将时间戳和 TTL 保留在消息头中,这样它们就不会污染您的消息合约,并使用自定义中间件将它们过滤掉。在这种情况下,您需要小心记录此类丢弃,这样您就不会想知道为什么消息会不时消失。

  • 几乎任何不可靠的集成都可以使用 sagas 进行监控,并带有超时。例如,我们使用 saga 与 Twilio 集成。由于我们无法为它们打开 webhook,因此我们会在一段时间后轮询以检查消息状态。您可以在收到消息时启动 saga 并安排消息以检查处理是否仍在等待。正如评论中所讨论的,您可以使用“需要人工干预”的方式来解决问题,或者让 saga 决定放弃该消息。

  • 类似的方法可能是使用查找表,在其中放置与处理无关的消息列表。这样的表将类似于 sagas 列表。似乎这种方式也需要调度。在这里和传奇中,我建议对DropIt消息使用单独的接收端点(队列),只有一个消费者。它将防止DropIt消息卡在等待处理的集成消息后面(有些应该已经被丢弃)

  • 使用 RMQ 管理 API 从队列中删除消息。这是最糟糕的方法,我不会推荐它。


推荐阅读