azureservicebus - 如何在不使用 Send 和 RequestAddress 的情况下,根据消息内容将两条相同类型的消息发布到不同的 worker 实例?
问题描述
如何在不使用 Send 和 RequestAddress 的情况下,根据消息内容将两条相同类型的消息发布到不同的 worker 实例?
我的情况是:
我正在使用 Azure ServiceBus 和 Azure StorageTables。
我正在运行同一个worker service workera和workerb的两个不同实例。我需要 workera 和 workerb 来根据 Command.WorkerPrefix 的值使用 Command 类型的消息。
命令类型如下所示:
{
WorkerPrefix = "a"
}
附加约束:
对于 worker 服务将使用的所有消息传递实体,我想添加一个不同的前缀,无论是 worker 还是 workerb,所以我使用cfg.SetEndpointNameFormatter(KebabCaseEndpointNameFormatter(args.[0], false))
args.[0]的代码 是“a”或“b”
现在我只定义了一个消费者,它消费一条命令类型的消息
这给了我一个像这样的拓扑:
Queues:
- a-command
- b-command
Topics:
- command
Subscriptions:
- a-command -> fwd a-command
- b-command -> fwd b-command
理想情况下,拓扑应该是:
Queues:
- a-command
- b-command
Topics:
- a-command
Subscriptions:
- a-command -> fwd a-command
- b-command
Subscriptions:
- b-command -> fwd b-command
这种理想拓扑结构的原因是,workera 没有权限在 workerb 的主题上创建订阅,反之亦然。如果这不可能,我可以通过在服务开始运行之前创建拓扑来解决部署时的权限问题,但更喜欢上面的拓扑。
现在,我想从 Saga 或 Future 的编排器发布两个 Command 类型的请求。一个是给工人的,一个是给工人的。
无论如何,是否可以根据消息的内容配置 IPublishEndpoint,以便发布者可以检查消息,看到 WorkerPrefix 是“a”,然后发布到 a-command 主题?
解决方案
将 MassTransit 与 Azure 服务总线结合使用,我建议将消息路由的负担从发布者转移到消费者身上。通过配置接收端点并使用订阅过滤器,每个实例将添加自己的订阅并使用消息头来过滤已发布的消息。
在发布者上,将添加一个消息头:
await publishEndpoint.Publish(new Command(), x => x.Headers.Set("WorkerId", "A"));
消息将发布到Command主题,然后通过订阅路由到接收端点。
使用订阅过滤器
例如A
,要配置接收端点,无论是手动配置接收端点还是使用消费者定义,都将配置接收端点配置器,如下所示。由于问题似乎是使用 ConfigureEndpoints,因此使用了消费者定义方法。
请注意,
Microsoft.Extensions.Options
用于将配置映射到容器,以便在配置总线时可用。
services.AddMassTransit(x =>
{
x.AddConsumer<CommandConsumer, CommandConsumerDefinition>();
x.SetEndpointNameFormatter(new KebabCaseEndpointNameFormatter("A", false));
x.UsingAzureServiceBus((context, cfg) =>
{
var hostOptions = context.GetRequiredService<IOptions<Host>>();
cfg.Host(hostOptions.Value.ConnectionString);
cfg.ConfigureEndpoints(context);
});
});
然后消费者定义将配置接收端点:
public class CommandConsumerDefinition :
ConsumerDefinition<CommandConsumer>
{
readonly string _workerId;
readonly IEndpointNameFormatter _endpointNameFormatter;
// using IOptions here, but somehow in the container the worker settings need to be resolved
// to get the workerId
public CommandConsumerDefinition(IOptions<Worker> options, IEndpointNameFormatter endpointNameFormatter)
{
_workerId = options.Value.WorkerId;
_endpointNameFormatter = endpointNameFormatter;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<CommandConsumer> consumerConfigurator)
{
if (endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
{
sb.ConfigureConsumeTopology = false;
var subscriptionName = _endpointNameFormatter.Consumer<CommandConsumer>();
sb.Subscribe<Command>(subscriptionName,
s => s.Filter = new SqlFilter($"WorkerId = '{_workerId}'"));
}
}
}
使用订阅过滤器,在发布消息时添加的标头值用于过滤消息,以便只有匹配过滤器的消息才会转发到工作队列。
生成的拓扑:
Queues:
- a-command
- b-command
Topics:
- command
Subscriptions:
- a-command (WorkerId = "A") -> fwd a-command
- b-command (WorkerId = "B") -> fwd b-command
为什么不直接发送?
如果您根本不想处理主题,您可以发送到队列并使用前缀自己格式化名称。然后,只需使用 ISendEndpointProvider(或传奇中的 .Send)到使用端点名称格式化程序结合前缀格式化的目标地址。
var destinationAddress = $"queue:{workerId}-{KebabCaseEndpointNameFormatter.Instance.Consumer<CommandConsumer>()}";
var endpoint = await sendEndpointProvider.GetSendEndpoint(new Uri(destinationAddress));
await endpoint.Send(new Command());
因此,有几个可能对您有用的选项。
推荐阅读
- sqlite - sqlite 复制 .db 文件不会复制文件的最后一个版本
- python - Python中的任何类型的范围解析运算符?
- c - 如何从睡眠模式唤醒 Pic32?
- ios - 在文本结束之前和之后放置操作按钮
- ios - Swift 4 GIDSignIn“未声明的类型”错误
- javascript - React 状态是批处理生命周期和非事件函数
- matlab - 如何获取txt文件中具有特定字符串的行号 - matlab
- sql - 在 SQL Server Management Studio 中的 group by 语句中仅显示 case 语句的最大值
- php - 类 api 在 laravel 上不存在反射异常
- computer-vision - 关于pyorch中的autograd,添加新的用户自定义层,我应该如何更新它的参数?