rabbitmq - 捷运如何添加活动
问题描述
我用 RabbitMq 总线创建了服务。现在,我正在尝试向该总线添加活动(我尝试了找到的每个注册选项)
private void ConfigureBus(IServiceCollectionConfigurator configurator)
{
configurator.AddActivity(typeof(StepAcitivity));
configurator.AddActivitiesFromNamespaceContaining<StepAcitivity>();
configurator.AddActivity<StepAcitivity, StepArguments, StepLog>();
configurator.AddConsumer<PingConsumer>();
}
我还在容器中注册了 StepAcitivity:
services.AddScoped<StepAcitivity>();
从另一个服务:首先:测试按预期工作的 ping 请求/响应:
IRequestClient<IPingRequest, IPingResponse> client =
busControl.CreateRequestClient<IPingRequest, IPingResponse>(serviceAddress, TimeSpan.FromSeconds(10));
IPingResponse s = await client.Request(request);
第二:创建和执行活动
string address = config.GetValue<string>("executorAddress");
Uri sendAddress = new Uri($"queue:{address}");
RoutingSlipBuilder builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("SetepA", sendAddress);
builder.AddActivity("StepB", sendAddress);
builder.AddActivity("StepC", sendAddress);
RoutingSlip routingSlip = builder.Build();
return routingSlip;
执行:
await bus.Execute(routingSlip);
ping 请求/响应按预期工作但 routingSlip 没有失败但我在活动端没有接到任何电话的问题。在 rabbitMQ 管理中,可以看到该消息已移至 WorkerService_skipped 队列。
- 我在工作人员服务端的活动注册中缺少什么?(总线看起来不错,因为我能够运行发布 IPingRequest 请求并读取同一地址的响应)
- 如果消息移动到跳过的队列,为什么调用
await bus.Execute(routingSlip)
没有下降?我的公共交通版本是:6.2.3
我的工人服务探测:
{
"resultId": "a4984523-5ffc-4ff4-8ceb-498029dbfeff",
"probeId": "561351df-42da-44a7-83ce-29a467966663",
"startTimestamp": "2020-07-06T12:51:32.0106524Z",
"duration": "00:00:00.0427169",
"host": {
"machineName": "SLAVAF-LAP",
"processName": "WorkerService",
"processId": 38212,
"assembly": "WorkerService",
"assemblyVersion": "1.0.0.0",
"frameworkVersion": "3.1.5",
"greenPipesVersion": "3.0.1.0",
"operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
},
"results": {
"bus": {
"address": "rabbitmq://localhost:0/SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp?temporary=true",
"host": {
"Type": "RabbitMQ",
"Host": "localhost",
"Port": 5672,
"VirtualHost": "/",
"Username": "slava",
"Password": "****",
"Heartbeat": 0,
"Ssl": false,
"connected": true,
"receiveEndpoint": [
{
"name": "WorkerService",
"started": true,
"transport": {
"type": "RabbitMQ",
"ExchangeName": "WorkerService",
"ExchangeArguments": [],
"ExchangeType": "fanout",
"Durable": true,
"AutoDelete": false,
"QueueName": "WorkerService",
"Exclusive": false,
"QueueArguments": [
[
"x-queue-type",
{}
]
],
"BindingArguments": [],
"PrefetchCount": 16,
"PurgeOnStartup": false,
"ExclusiveConsumer": false,
"NoAck": false,
"BindQueue": true,
"ConsumeArguments": [],
"topology": {
"exchange": [
{
"Name": "WorkerService",
"Type": "fanout",
"Durable": true,
"AutoDelete": false
},
{
"Name": "Common.Messages.Ping:IPingRequest",
"Type": "fanout",
"Durable": true,
"AutoDelete": false
}
],
"queue": {
"Name": "WorkerService",
"Durable": true,
"AutoDelete": false,
"Exclusive": false,
"argument": {
"key": "x-queue-type",
"value": "quorum"
}
},
"exchange-binding": {
"Source": "Common.Messages.Ping:IPingRequest",
"Destination": "WorkerService"
},
"queue-binding": {
"Source": "WorkerService",
"Destination": "WorkerService"
}
}
},
"filters": [
{
"filterType": "deadLetter",
"filters": {
"filterType": "dead-letter"
}
},
{
"filterType": "rescue",
"filters": [
{
"filterType": "generateFault"
},
{
"filterType": "moveFault"
}
]
},
{
"filterType": "deserialize",
"deserializers": {
"json": {
"contentType": "application/vnd.masstransit+json"
},
"bson": {
"contentType": "application/vnd.masstransit+bson"
},
"xml": {
"contentType": "application/vnd.masstransit+xml"
}
},
"consumePipe": {
"filters": {
"filterType": "dispatchPipe",
"outputType": "MassTransit.ConsumeContext<Common.Messages.Ping.IPingRequest>",
"filters": {
"filterType": "MessageFilter",
"Type": "Common.Messages.Ping.IPingRequest"
},
"consumer": {
"type": "WorkerService.Consumers.PingConsumer",
"consumerFactory": {
"source": "scope",
"consumerType": "WorkerService.Consumers.PingConsumer",
"provider": "dependencyInjection"
},
"filters": {
"filterType": "split",
"ConsumerType": "WorkerService.Consumers.PingConsumer",
"filters": {
"filterType": "ScopedLoggingFilter"
}
},
"consume": {
"method": "Consume(ConsumeContext<Common.Messages.Ping.IPingRequest> context)"
}
}
}
}
}
]
},
{
"name": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
"started": true,
"transport": {
"type": "RabbitMQ",
"ExchangeName": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
"ExchangeArguments": [],
"ExchangeType": "fanout",
"Durable": false,
"AutoDelete": true,
"QueueName": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
"Exclusive": false,
"QueueExpiration": "00:01:00",
"QueueArguments": [],
"BindingArguments": [],
"PrefetchCount": 16,
"PurgeOnStartup": false,
"ExclusiveConsumer": false,
"NoAck": false,
"BindQueue": true,
"ConsumeArguments": [],
"topology": {
"exchange": {
"Name": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
"Type": "fanout",
"Durable": false,
"AutoDelete": true
},
"queue": {
"Name": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
"Durable": false,
"AutoDelete": false,
"Exclusive": false,
"argument": {
"key": "x-expires",
"value": 60000
}
},
"queue-binding": {
"Source": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
"Destination": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp"
}
}
},
"filters": [
{
"filterType": "deadLetter",
"filters": {
"filterType": "dead-letter"
}
},
{
"filterType": "rescue",
"filters": [
{
"filterType": "generateFault"
},
{
"filterType": "moveFault"
}
]
},
{
"filterType": "deserialize",
"deserializers": {
"json": {
"contentType": "application/vnd.masstransit+json"
},
"bson": {
"contentType": "application/vnd.masstransit+bson"
},
"xml": {
"contentType": "application/vnd.masstransit+xml"
}
},
"consumePipe": {}
}
]
}
]
}
}
}
}
解决方案
每个活动都需要一个单独的端点来执行,如果活动支持补偿,则需要一个额外的端点来进行补偿。您不能在同一个端点上配置您的活动,并且端点不应与其他服务共享。
使用 AddActivity() 足以配置容器,您无需使用您的活动调用 AddScoped。
此外,您可以使用 ConfigureEndpoints 并传递注册上下文来为所有活动、sagas 和消费者配置总线端点。
您包含的探测(太好了,谢谢)表明您只有一个接收端点 WorkerService,它有一个处理
IPingRequest
消息的使用者。