.net-core - MassTransit 和 RabbitMQ 中 ApiGateway-MicroService 通信模式的最佳设置
问题描述
我正在使用 MassTransit 和 RabbitMQ 在我的应用程序中实现 ApiGateway-MicroService 通信协议。该协议旨在取代 ApiGateway 和微服务之间的“传统”REST API 通信(我在这里谈论的是简单的请求-响应,而不是任何类型的事件、sagas 等)。因此,在微服务方面,我有消费者(响应请求),在 ApiGateway 方面,我有请求客户端。通常微服务有大约 10 个消费者(例如 OrderingMicroservice 有以下请求的消费者:CreateOrder、UpdateOrder、GetOrderById、ListUserOrders 等)。我正在尝试为这种情况找出最佳拓扑(Masstransit + RabbitMQ)。
这是我的目标,至少我认为它应该像这样工作:
A.请求消息(路由到消费者队列)应该只在短时间内(例如 20 秒)持久,然后从消费者队列中删除(并且请求客户端应该收到超时错误)并且不路由到任何其他队列。因此,当微服务暂时关闭或暂时太忙而无法从队列接收下一个请求时,请求消息应在队列中保留 20 秒然后消失。
B.由于 RequestClient 应该在约 20 秒后超时,响应消息(路由到客户端“响应队列”)也应该在短时间内(约 20 秒)持久。然后他们可以消失。如果 ApiGW 离线/太忙而无法接收响应,则应丢弃响应。
所以基本上我想使用 MassTransit/RabbitMQ 作为 ApiGW 和微服务之间的短暂缓冲区。
// ApiGw MassTransit configuration
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
});
x.AddRequestClient<ICreateGroupPayload>();
});
// Service MassTransit configuration
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
// Single consumer definition in service
public class CreateGroupActionDefinition : ConsumerDefinition<CreateGroupAction>
{
public CreateGroupActionDefinition()
{
EndpointName = "group-service";
}
}
此设置创建以下交换和队列:
exchange ICreateGroupPayload (fanout, durable) => bind exchange:group-service
exchange group-service (fanout, durable) => bind queue:group-service
exchange PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3 (fanout, autoDelete) => bind queue:PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3
queue group-service (durable)
queue PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3 (x-expires: 60000)
当我在 ~1 分钟内从 RabbitMQ 中删除交换/队列之后终止 ApiGw 时:
exchange PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3
queue PublicGateway_bus_4wdoyyro5ycgmgbybdcx1gp3r3
我的问题是:
我应该为微服务中的不同消费者使用单独的队列(端点名称)吗?或者我可以为不同的消费者/消息类型使用相同的队列(例如组服务)?
如何修改我的配置以在我的消费者队列上设置过期时间?现在它很耐用,但我希望在大约 20 秒后删除消息。另外我认为这样的队列不应该在消费者断开连接后被删除,因为即使消费者离线(但只能持续 20 秒)它也应该能够发送请求。
如何修改配置以将请求客户端响应队列的过期时间设置为 20 秒(目前默认情况下似乎是 60 秒?)。
也许有人对如何调整拓扑以最适合这种情况有任何其他建议?目的是尽可能快地设置简单的请求响应+边缘情况的短时间缓冲。
解决方案
正如您从请求文档中了解的那样,所有工作都由 MassTransit 完成。将请求客户端添加到容器时,您可以将默认请求超时从 30 秒更改为 20 秒。还有一种.AddGenericRequestClient()
方法可以为需要的任何请求类型自动添加请求客户端。
您还可以为每个请求指定请求超时,它将设置消息 TimeToLive 以匹配该值。响应应根据需要与 TimeToLive 一起发送。
推荐阅读
- firebase - 部署 firebase 应用程序时出错:命令需要范围
- python - 如何从数据框中排除特定行?
- angular - IONIC 5 找不到类型为“object”的不同支持对象“[object Object]”。NgFor 仅支持绑定到 Iterables,例如 Arrays
- erlang - 未调用 Ejabberd 中的自定义模块
- docker - 码头工人错误!bash:docker.exe:找不到命令
- mysql - 如何在 MySQL 的数据透视表上更好地使用逻辑门
- python - 基于日志数据列有效地重复熊猫数据帧行
- python - Python:无法从 API 获取数据
- java - 如何在不循环的情况下从 Java 中的 List 中获取特定的 Column 列表
- excel - 将 Excel 文件转换为文本文件