首页 > 解决方案 > 如何覆盖 MassTransit 默认交换和队列拓扑约定?

问题描述

正如所指出的[在我关于 SO 的一个问题中](为什么 MassTransit 中的简单配置会创建 2 个队列和 3 个交换?),MassTransit for RabbitMQ 会自动创建一定数量的队列并针对给定的简单配置进行交换:

交易所,所有扇出:

  • ConsoleApp1:Program-YourMessage: 耐用的
  • VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt:自动删除和耐用?
  • test_queue: 耐用的

队列:

  • VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt: x-过期 60000
  • test_queue: 耐用的

但是,我发现无法覆盖这些交换和队列的命名有点令人沮丧。我能做些什么来改变它吗?

例如,如果您重构某些类型或命名空间,您最终可能会污染您的 RabbitMQ 实例,其中有大量不再使用的交换 =/

我理解,test_queue因为这是我决定如此公平的事情。类型很容易受到更改/重构。

标签: c#.net-corerabbitmqmasstransit

解决方案


这是一种简单有效的方法:https ://bartwullems.blogspot.com/2018/09/masstransitchange-exchange-naming.html

但最好在这里删除一些 dotnet 核心代码,以帮助任何刚开始的人。

我们基于配置的自定义格式化程序:

public class BusEnvironmentNameFormatter : IEntityNameFormatter
{
    private readonly IEntityNameFormatter _original;
    private readonly string _prefix;

    public BusEnvironmentNameFormatter(IEntityNameFormatter original, SomeAppSettingsSection busSettings)
    {
        _original = original;
        _prefix = string.IsNullOrWhiteSpace(busSettings.Environment)
            ? string.Empty // no prefix
            : $"{busSettings.Environment}:"; // custom prefix
    }

    // Used to rename the exchanges
    public string FormatEntityName<T>()
    {
        var original = _original.FormatEntityName<T>();
        return Format(original);
    }

    // Use this one to rename the queue
    public string Format(string original)
    {
        return string.IsNullOrWhiteSpace(_prefix)
            ? original
            : $"{_prefix}{original}";
    }
}

然后要使用它,我们会做这样的事情:

var busSettings = busConfigSection.Get<SomeAppSettingsSection>();
var rabbitMqSettings = rabbitMqConfigSection.Get<SomeOtherAppSettingsSection>();

services.AddMassTransit(scConfig =>
{
    scConfig.AddConsumers(consumerAssemblies);

    scConfig.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(rmqConfig =>
    {
        rmqConfig.UseExtensionsLogging(provider.GetRequiredService<ILoggerFactory>());

        // Force serialization of default values: null, false, etc
        rmqConfig.ConfigureJsonSerializer(jsonSettings =>
        {
            jsonSettings.DefaultValueHandling = DefaultValueHandling.Include;
            return jsonSettings;
        });

        var nameFormatter = new BusEnvironmentNameFormatter(rmqConfig.MessageTopology.EntityNameFormatter, busSettings);
        var host = rmqConfig.Host(new Uri(rabbitMqSettings.ConnectionString), hostConfig =>
        {
            hostConfig.Username(rabbitMqSettings.Username);
            hostConfig.Password(rabbitMqSettings.Password);
        });

        // Endpoint with custom naming
        rmqConfig.ReceiveEndpoint(host, nameFormatter.Format(busSettings.Endpoint), epConfig =>
        {
            epConfig.PrefetchCount = busSettings.MessagePrefetchCount;
            epConfig.UseMessageRetry(x => x.Interval(busSettings.MessageRetryCount, busSettings.MessageRetryInterval));
            epConfig.UseInMemoryOutbox();

            //TODO: Bind messages to this queue/endpoint
            epConfig.MapMessagesToConsumers(provider, busSettings);
        });

        // Custom naming for exchanges
        rmqConfig.MessageTopology.SetEntityNameFormatter(nameFormatter);
    }));
});

推荐阅读