c# - MassTransit/RabbitMQ - 将被拒绝的消息移动到死信并重新发送
问题描述
我目前正在开展一个项目,我们希望使用死信队列来重新传递消息,而不是使用 MassTransit 的内置调度程序。
这个想法很简单:如果消费者抛出一组特定的异常,我们会多次重试消息,然后希望重新安排消息以在(不久的)将来重新传递。例如,如果消息以错误的顺序到达。我正在尝试完成本文中的选项 #2 ( https://engineering.nanit.com/rabbitmq-retries-the-full-story-ca4cc6c5b493 )
每当我的消费者抛出异常时,MassTransit 都会将消息移动到 _error 队列而不是死信。我确定这是我的错误配置,但我不确定如何完成我想要的。
我的总线配置如下:
IBusControl ConfigureBus(IBusRegistrationContext provider)
{
return Bus.Factory.CreateUsingRabbitMq(sbc =>
{
sbc.Host("localhost", c =>
{
c.Username("guest");
c.Password("guest");
});
sbc.MessageTopology.SetEntityNameFormatter(new EntityNameFormatter("Consumer1"));
sbc.ReceiveEndpoint("Consumer1.ActionPerformed", ep =>
{
ep.Consumer<ActionPerformedConsumer>();
ep.ExchangeType = ExchangeType.Fanout;
ep.UseMessageRetry(r => r.Immediate(1));
ep.BindDeadLetterQueue("deadletter", "dead", cfg =>
{
cfg.SetExchangeArgument("x-message-ttl", TimeSpan.FromSeconds(60));
cfg.Durable = true;
});
});
});
}
解决方案
要更改 MassTransit 的错误队列行为,您可以配置接收端点以将错误传播回传输,这会将错误返回给代理。
ep.RethrowFaultedMessages();
但是,我很确定 MassTransit 会使用requeue
设置为的参数来执行此操作,true
以便在正常情况下不会丢失消息。我不相信目前有办法覆盖它。
似乎如果这是一种适用于标准队列的方法(当然,它不适用于仲裁队列,它们不支持 TTL),那么可能值得将这种类型的配置添加到 RabbitMQ 以便它构建排除交易所等来处理它作为延迟交易所调度程序的替代方案。
更新,我认为选项 3 优于选项 2,并且更符合 MassTransit 的工作方式。
推荐阅读
- ruby-on-rails - How can I update two User objects in the same Rails form_for?
- angular - Firestore Angular 获取文档中的所有集合名称
- angular - 当我运行 ng serve 时找不到模块'@angular/compiler-cli/ngc'
- regex - Angular 验证器模式正则表达式,用于匹配一个或另一个规则,但不能同时匹配两者
- json - 使用 jq 连接多个文件中的 JSON 数组
- javascript - 按sharepoint 2013按组获取列表项
- api - 如何限制对 Google App Engine 的外部访问并仅允许 Apigee Edge?
- date - 日期的条件格式
- php - 除非清除 Cookie,否则登录页面会超时
- meteor - 启用汇总的 Meteor 1.9 构建中断