首页 > 解决方案 > 添加消费者时的 MassTransit

问题描述

我试图了解以不完全依赖这个抽象层库的方式使用 MassTransit 时会发生什么,而是要真正了解和理解在幕后创建的内容及其背后的原因。

在我的应用程序中,我通过以下方式注册消费者:

container.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(configurationProvider.RabbitHostName);

        x.AddConsumer<FactAddedHandler>();
        x.AddConsumer<FactAddedOrderHandler>();
        x.AddConsumer<FactCategoryHandler>();

        cfg.ConfigureEndpoints(container);
    }));

});

跟我来,到目前为止,我们有这样的场景:

在此处输入图像描述

当我检查正在通过哪些队列创建时

rabbitmqctl list_queues 名称消息消费者

我懂了:

FactAddedHandler     0       1
FactAddedOrderHandler        0       1
FactCategoryHandler  0       1

这让我相信每个特定消费者都有一个队列。所以我们有这样的场景:

在此处输入图像描述

让我们看看这些处理程序是如何定义的:

internal class FactAddedHandler : IConsumer<FactAddedIntegrationEvent>
{
    //
}


internal class FactAddedOrderHandler : IConsumer<FactAddedIntegrationEvent>
{
    //
}


internal class FactCategoryHandler : IConsumer<FactCategoryIntegrationEvent>
{
    //
}

因此,前 2 个处理程序(FactAddedHandlerFactAddedOrderHandler)订阅同一个事件(FactAddedIntegrationEvent),而另一个(FactCategoryHandler **)订阅另一个事件(**FactCategoryIntegrationEvent)。

因此,我希望在广播FactAddedIntegrationEvent的前 2 个队列之上“至少”有一个扇出交换。因此,让我们通过以下方式查看交易所:

rabbitmqctl list_exchanges 名称类型

结果是:

FactCategoryHandler     fanout
FactAddedOrderHandler      fanout
FactAddedHandler   fanout

IntegrationEvents:FactAddedIntegrationEvent  fanout
IntegrationEvents:FactCategoryIntegrationEvent    fanout

所以.. 我期望的是IntegrationEvents:FactAddedIntegrationEvent是广播FactAddedIntegrationEvent的扇出交换。

我还相信,masstransit 默认创建另一个交换IntegrationEvents:FactCategoryIntegrationEvent,这样可以很容易地将其他消费者添加到同一事件中,即使在我的情况下只有一个。

因此,我们最终会遇到这种情况,这仍然是有意义的:

在此处输入图像描述

我不明白并且想要解释的是创建其他 3 个剩余交易所的原因。他们的作用是什么?他们为什么在那里?提前致谢!

标签: .netrabbitmqmasstransit

解决方案


对于接收端点,MassTransit 创建一个队列和一个具有相同名称的匹配交换。队列绑定到匹配的交换器。

消费者在接收端点上使用的消息类型用于声明交换(如上所示),并且匹配的交换绑定到这些消息类型交换。所有这些交易所都是扇出交易所。

生成的拓扑通过匹配的交换路由所有消息类型,然后路由到队列。

这通过文档中的示例进行了解释。

RabbitMQ 发布拓扑

为什么要匹配交换?两个答案。

首先,简单的答案,它允许使用路由键将消息发送到交换器。使用 RabbitMQ 发送到队列需要以队列名称作为路由键的空交换。

其次,它允许将额外的队列绑定到匹配的交换机以进行故障排除。这包括设置窃听以保留发送到端点的所有消息的副本(直接或通过已发布的消息类型交换)。


推荐阅读