首页 > 解决方案 > 如何在不使用 Send 和 RequestAddress 的情况下,根据消息内容将两条相同类型的消息发布到不同的 worker 实例?

问题描述

如何在不使用 Send 和 RequestAddress 的情况下,根据消息内容将两条相同类型的消息发布到不同的 worker 实例?

我的情况是:

我正在使用 Azure ServiceBus 和 Azure StorageTables。

我正在运行同一个worker service workera和workerb的两个不同实例。我需要 workera 和 workerb 来根据 Command.WorkerPrefix 的值使用 Command 类型的消息。

命令类型如下所示:

{
    WorkerPrefix = "a"
}

附加约束:

对于 worker 服务将使用的所有消息传递实体,我想添加一个不同的前缀,无论是 worker 还是 workerb,所以我使用cfg.SetEndpointNameFormatter(KebabCaseEndpointNameFormatter(args.[0], false))args.[0]的代码 是“a”或“b”

现在我只定义了一个消费者,它消费一条命令类型的消息

这给了我一个像这样的拓扑:

Queues:
- a-command
- b-command

Topics:
- command
  Subscriptions:
  - a-command -> fwd a-command
  - b-command -> fwd b-command

理想情况下,拓扑应该是:

Queues:
- a-command
- b-command

Topics:
- a-command
  Subscriptions:
  - a-command -> fwd a-command
- b-command
  Subscriptions:
  - b-command -> fwd b-command

这种理想拓扑结构的原因是,workera 没有权限在 workerb 的主题上创建订阅,反之亦然。如果这不可能,我可以通过在服务开始运行之前创建拓扑来解决部署时的权限问题,但更喜欢上面的拓扑。

现在,我想从 Saga 或 Future 的编排器发布两个 Command 类型的请求。一个是给工人的,一个是给工人的。

无论如何,是否可以根据消息的内容配置 IPublishEndpoint,以便发布者可以检查消息,看到 WorkerPrefix 是“a”,然后发布到 a-command 主题?

标签: azureservicebusmasstransit

解决方案


将 MassTransit 与 Azure 服务总线结合使用,我建议将消息​​路由的负担从发布者转移到消费者身上。通过配置接收端点并使用订阅过滤器,每个实例将添加自己的订阅并使用消息头来过滤已发布的消息。

在发布者上,将添加一个消息头:

await publishEndpoint.Publish(new Command(), x => x.Headers.Set("WorkerId", "A"));

消息将发布到Command主题,然后通过订阅路由到接收端点。

使用订阅过滤器

例如A,要配置接收端点,无论是手动配置接收端点还是使用消费者定义,都将配置接收端点配置器,如下所示。由于问题似乎是使用 ConfigureEndpoints,因此使用了消费者定义方法。

请注意,Microsoft.Extensions.Options用于将配置映射到容器,以便在配置总线时可用。

services.AddMassTransit(x =>
{
    x.AddConsumer<CommandConsumer, CommandConsumerDefinition>();

    x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter("A", false));

    x.UsingAzureServiceBus((context, cfg) =>
    {
        var hostOptions = context.GetRequiredService<IOptions<Host>>();

        cfg.Host(hostOptions.Value.ConnectionString);

        cfg.ConfigureEndpoints(context);
    });
});

然后消费者定义将配置接收端点:

public class CommandConsumerDefinition :
    ConsumerDefinition<CommandConsumer>
{
    readonly string _workerId;
    readonly IEndpointNameFormatter _endpointNameFormatter;

    // using IOptions here, but somehow in the container the worker settings need to be resolved
    // to get the workerId
    public CommandConsumerDefinition(IOptions<Worker> options, IEndpointNameFormatter endpointNameFormatter)
    {
        _workerId = options.Value.WorkerId;
        _endpointNameFormatter = endpointNameFormatter;
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<CommandConsumer> consumerConfigurator)
    {
        if (endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
        {
            sb.ConfigureConsumeTopology = false;

            var subscriptionName = _endpointNameFormatter.Consumer<CommandConsumer>();

            sb.Subscribe<Command>(subscriptionName, 
                s => s.Filter = new SqlFilter($"WorkerId = '{_workerId}'"));
        }
    }
}

使用订阅过滤器,在发布消息时添加的标头值用于过滤消息,以便只有匹配过滤器的消息才会转发到工作队列。

生成的拓扑:

Queues:
- a-command
- b-command

Topics:
- command
  Subscriptions:
  - a-command (WorkerId = "A") -> fwd a-command
  - b-command (WorkerId = "B") -> fwd b-command

为什么不直接发送?

如果您根本不想处理主题,您可以发送到队列并使用前缀自己格式化名称。然后,只需使用 ISendEndpointProvider(或传奇中的 .Send)到使用端点名称格式化程序结合前缀格式化的目标地址。

var destinationAddress = $"queue:{workerId}-{KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()}";
var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri(destinationAddress));
await endpoint.Send(new Command());

因此,有几个可能对您有用的选项。


推荐阅读