首页 > 解决方案 > MassTransit/RabbitMQ - 将被拒绝的消息移动到死信并重新发送

问题描述

我目前正在开展一个项目,我们希望使用死信队列来重新传递消息,而不是使用 MassTransit 的内置调度程序。

这个想法很简单:如果消费者抛出一组特定的异常,我们会多次重试消息,然后希望重新安排消息以在(不久的)将来重新传递。例如,如果消息以错误的顺序到达。我正在尝试完成本文中的选项 #2 ( https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493 )

每当我的消费者抛出异常时,MassTransit 都会将消息移动到 _error 队列而不是死信。我确定这是我的错误配置,但我不确定如何完成我想要的。

我的总线配置如下:

IBusControl ConfigureBus(IBusRegistrationContext provider)
{
    return Bus.Factory.CreateUsingRabbitMq(sbc =>
    {
        sbc.Host("localhost", c =>
        {
            c.Username("guest");
            c.Password("guest");
        });

        sbc.MessageTopology.SetEntityNameFormatter(new EntityNameFormatter("Consumer1"));

        sbc.ReceiveEndpoint("Consumer1.ActionPerformed", ep =>
        {
            ep.Consumer<ActionPerformedConsumer>();
            ep.ExchangeType = ExchangeType.Fanout;

            ep.UseMessageRetry(r => r.Immediate(1));
            ep.BindDeadLetterQueue("deadletter", "dead", cfg =>
            {
                cfg.SetExchangeArgument("x-message-ttl", TimeSpan.FromSeconds(60));
                cfg.Durable = true;
            });
        });
    });
}

标签: c#rabbitmqmasstransit

解决方案


要更改 MassTransit 的错误队列行为,您可以配置接收端点以将错误传播回传输,这会将错误返回给代理。

ep.RethrowFaultedMessages();

但是,我很确定 MassTransit 会使用requeue设置为的参数来执行此操作,true以便在正常情况下不会丢失消息。我不相信目前有办法覆盖它。

似乎如果这是一种适用于标准队列的方法(当然,它不适用于仲裁队列,它们不支持 TTL),那么可能值得将这种类型的配置添加到 RabbitMQ 以便它构建排除交易所等来处理它作为延迟交易所调度程序的替代方案。

更新,我认为选项 3 优于选项 2,并且更符合 MassTransit 的工作方式。


推荐阅读