首页 > 解决方案 > MassTransit 和 RabbitMQ 中 ApiGateway-MicroService 通信模式的最佳设置

问题描述

我正在使用 MassTransit 和 RabbitMQ 在我的应用程序中实现 ApiGateway-MicroService 通信协议。该协议旨在取代 ApiGateway 和微服务之间的“传统”REST API 通信(我在这里谈论的是简单的请求-响应,而不是任何类型的事件、sagas 等)。因此,在微服务方面,我有消费者(响应请求),在 ApiGateway 方面,我有请求客户端。通常微服务有大约 10 个消费者(例如 OrderingMicroservice 有以下请求的消费者:CreateOrder、UpdateOrder、GetOrderById、ListUserOrders 等)。我正在尝试为这种情况找出最佳拓扑(Masstransit + RabbitMQ)。

这是我的目标,至少我认为它应该像这样工作:

A.请求消息(路由到消费者队列)应该只在短时间内(例如 20 秒)持久,然后从消费者队列中删除(并且请求客户端应该收到超时错误)并且不路由到任何其他队列。因此,当微服务暂时关闭或暂时太忙而无法从队列接收下一个请求时,请求消息应在队列中保留 20 秒然后消失。

B.由于 RequestClient 应该在约 20 秒后超时,响应消息(路由到客户端“响应队列”)也应该在短时间内(约 20 秒)持久。然后他们可以消失。如果 ApiGW 离线/太忙而无法接收响应,则应丢弃响应。

所以基本上我想使用 MassTransit/RabbitMQ 作为 ApiGW 和微服务之间的短暂缓冲区。

    // ApiGw MassTransit configuration
    services.AddMassTransit(x =>
                {
                    x.SetKebabCaseEndpointNameFormatter();
                    x.UsingRabbitMq((context, cfg) =>
                    {
                        
                    });
                    
                    x.AddRequestClient<ICreateGroupPayload>();
                });
    // Service MassTransit configuration
    services.AddMassTransit(x =>
                        {
                            x.SetKebabCaseEndpointNameFormatter();
                            var entryAssembly = Assembly.GetEntryAssembly();
    
                            x.AddConsumers(entryAssembly);
    
                            x.UsingRabbitMq((context, cfg) =>
                            {
                                cfg.ConfigureEndpoints(context);
                            });
                        });
// Single consumer definition in service
public class CreateGroupActionDefinition : ConsumerDefinition<CreateGroupAction>
    {
        public CreateGroupActionDefinition()
        {
            EndpointName = "group-service";
        }
    }

此设置创建以下交换和队列:

exchange ICreateGroupPayload (fanout, durable) => bind exchange:group-service
exchange group-service (fanout, durable) => bind queue:group-service
exchange PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3 (fanout, autoDelete) => bind queue:PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3

queue group-service (durable)
queue PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3 (x-expires: 60000)

当我在 ~1 分钟内从 RabbitMQ 中删除交换/队列之后终止 ApiGw 时:

exchange PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3
queue PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3

我的问题是:

  1. 我应该为微服务中的不同消费者使用单独的队列(端点名称)吗?或者我可以为不同的消费者/消息类型使用相同的队列(例如组服务)?

  2. 如何修改我的配置以在我的消费者队列上设置过期时间?现在它很耐用,但我希望在大约 20 秒后删除消息。另外我认为这样的队列不应该在消费者断开连接后被删除,因为即使消费者离线(但只能持续 20 秒)它也应该能够发送请求。

  3. 如何修改配置以将请求客户端响应队列的过期时间设置为 20 秒(目前默认情况下似乎是 60 秒?)。

  4. 也许有人对如何调整拓扑以最适合这种情况有任何其他建议?目的是尽可能快地设置简单的请求响应+边缘情况的短时间缓冲。

标签: .net-coremasstransit

解决方案


正如您从请求文档中了解的那样,所有工作都由 MassTransit 完成。将请求客户端添加到容器时,您可以将默认请求超时从 30 秒更改为 20 秒。还有一种.AddGenericRequestClient()方法可以为需要的任何请求类型自动添加请求客户端。

您还可以为每个请求指定请求超时,它将设置消息 TimeToLive 以匹配该值。响应应根据需要与 TimeToLive 一起发送。


推荐阅读