首页 > 解决方案 > 捷运如何添加活动

问题描述

我用 RabbitMq 总线创建了服务。现在,我正在尝试向该总线添加活动(我尝试了找到的每个注册选项)

    private void ConfigureBus(IServiceCollectionConfigurator configurator)
    {
        configurator.AddActivity(typeof(StepAcitivity));
        configurator.AddActivitiesFromNamespaceContaining<StepAcitivity>();
        configurator.AddActivity<StepAcitivity, StepArguments, StepLog>();
        configurator.AddConsumer<PingConsumer>();
    }

我还在容器中注册了 StepAcitivity:

    services.AddScoped<StepAcitivity>();

从另一个服务:首先:测试按预期工作的 ping 请求/响应:

    IRequestClient<IPingRequest, IPingResponse> client =
                   busControl.CreateRequestClient<IPingRequest, IPingResponse>(serviceAddress, TimeSpan.FromSeconds(10));
                IPingResponse s = await client.Request(request);

第二:创建和执行活动

    string address = config.GetValue<string>("executorAddress");
    Uri sendAddress = new Uri($"queue:{address}");
    RoutingSlipBuilder builder = new RoutingSlipBuilder(NewId.NextGuid());
    
    builder.AddActivity("SetepA", sendAddress);
    builder.AddActivity("StepB", sendAddress);
    builder.AddActivity("StepC", sendAddress);
    RoutingSlip routingSlip = builder.Build();
    return routingSlip;

执行:

    await bus.Execute(routingSlip);

ping 请求/响应按预期工作但 routingSlip 没有失败但我在活动端没有接到任何电话的问题。在 rabbitMQ 管理中,可以看到该消息已移至 WorkerService_skipped 队列。

  1. 我在工作人员服务端的活动注册中缺少什么?(总线看起来不错,因为我能够运行发布 IPingRequest 请求并读取同一地址的响应)
  2. 如果消息移动到跳过的队列,为什么调用 await bus.Execute(routingSlip)没有下降?我的公共交通版本是:6.2.3

我的工人服务探测:

    {
      "resultId": "a4984523-5ffc-4ff4-8ceb-498029dbfeff",
      "probeId": "561351df-42da-44a7-83ce-29a467966663",
      "startTimestamp": "2020-07-06T12:51:32.0106524Z",
      "duration": "00:00:00.0427169",
      "host": {
        "machineName": "SLAVAF-LAP",
        "processName": "WorkerService",
        "processId": 38212,
        "assembly": "WorkerService",
        "assemblyVersion": "1.0.0.0",
        "frameworkVersion": "3.1.5",
        "greenPipesVersion": "3.0.1.0",
        "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
      },
      "results": {
        "bus": {
          "address": "rabbitmq://localhost:0/SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp?temporary=true",
          "host": {
            "Type": "RabbitMQ",
            "Host": "localhost",
            "Port": 5672,
            "VirtualHost": "/",
            "Username": "slava",
            "Password": "****",
            "Heartbeat": 0,
            "Ssl": false,
            "connected": true,
            "receiveEndpoint": [
              {
                "name": "WorkerService",
                "started": true,
                "transport": {
                  "type": "RabbitMQ",
                  "ExchangeName": "WorkerService",
                  "ExchangeArguments": [],
                  "ExchangeType": "fanout",
                  "Durable": true,
                  "AutoDelete": false,
                  "QueueName": "WorkerService",
                  "Exclusive": false,
                  "QueueArguments": [
                    [
                      "x-queue-type",
                      {}
                    ]
                  ],
                  "BindingArguments": [],
                  "PrefetchCount": 16,
                  "PurgeOnStartup": false,
                  "ExclusiveConsumer": false,
                  "NoAck": false,
                  "BindQueue": true,
                  "ConsumeArguments": [],
                  "topology": {
                    "exchange": [
                      {
                        "Name": "WorkerService",
                        "Type": "fanout",
                        "Durable": true,
                        "AutoDelete": false
                      },
                      {
                        "Name": "Common.Messages.Ping:IPingRequest",
                        "Type": "fanout",
                        "Durable": true,
                        "AutoDelete": false
                      }
                    ],
                    "queue": {
                      "Name": "WorkerService",
                      "Durable": true,
                      "AutoDelete": false,
                      "Exclusive": false,
                      "argument": {
                        "key": "x-queue-type",
                        "value": "quorum"
                      }
                    },
                    "exchange-binding": {
                      "Source": "Common.Messages.Ping:IPingRequest",
                      "Destination": "WorkerService"
                    },
                    "queue-binding": {
                      "Source": "WorkerService",
                      "Destination": "WorkerService"
                    }
                  }
                },
                "filters": [
                  {
                    "filterType": "deadLetter",
                    "filters": {
                      "filterType": "dead-letter"
                    }
                  },
                  {
                    "filterType": "rescue",
                    "filters": [
                      {
                        "filterType": "generateFault"
                      },
                      {
                        "filterType": "moveFault"
                      }
                    ]
                  },
                  {
                    "filterType": "deserialize",
                    "deserializers": {
                      "json": {
                        "contentType": "application/vnd.masstransit+json"
                      },
                      "bson": {
                        "contentType": "application/vnd.masstransit+bson"
                      },
                      "xml": {
                        "contentType": "application/vnd.masstransit+xml"
                      }
                    },
                    "consumePipe": {
                      "filters": {
                        "filterType": "dispatchPipe",
                        "outputType": "MassTransit.ConsumeContext<Common.Messages.Ping.IPingRequest>",
                        "filters": {
                          "filterType": "MessageFilter",
                          "Type": "Common.Messages.Ping.IPingRequest"
                        },
                        "consumer": {
                          "type": "WorkerService.Consumers.PingConsumer",
                          "consumerFactory": {
                            "source": "scope",
                            "consumerType": "WorkerService.Consumers.PingConsumer",
                            "provider": "dependencyInjection"
                          },
                          "filters": {
                            "filterType": "split",
                            "ConsumerType": "WorkerService.Consumers.PingConsumer",
                            "filters": {
                              "filterType": "ScopedLoggingFilter"
                            }
                          },
                          "consume": {
                            "method": "Consume(ConsumeContext<Common.Messages.Ping.IPingRequest> context)"
                          }
                        }
                      }
                    }
                  }
                ]
              },
              {
                "name": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
                "started": true,
                "transport": {
                  "type": "RabbitMQ",
                  "ExchangeName": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
                  "ExchangeArguments": [],
                  "ExchangeType": "fanout",
                  "Durable": false,
                  "AutoDelete": true,
                  "QueueName": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
                  "Exclusive": false,
                  "QueueExpiration": "00:01:00",
                  "QueueArguments": [],
                  "BindingArguments": [],
                  "PrefetchCount": 16,
                  "PurgeOnStartup": false,
                  "ExclusiveConsumer": false,
                  "NoAck": false,
                  "BindQueue": true,
                  "ConsumeArguments": [],
                  "topology": {
                    "exchange": {
                      "Name": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
                      "Type": "fanout",
                      "Durable": false,
                      "AutoDelete": true
                    },
                    "queue": {
                      "Name": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
                      "Durable": false,
                      "AutoDelete": false,
                      "Exclusive": false,
                      "argument": {
                        "key": "x-expires",
                        "value": 60000
                      }
                    },
                    "queue-binding": {
                      "Source": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp",
                      "Destination": "SLAVAFLAP_WorkerService_bus_x37yyyd4rz1m1wcfbdcndk4enp"
                    }
                  }
                },
                "filters": [
                  {
                    "filterType": "deadLetter",
                    "filters": {
                      "filterType": "dead-letter"
                    }
                  },
                  {
                    "filterType": "rescue",
                    "filters": [
                      {
                        "filterType": "generateFault"
                      },
                      {
                        "filterType": "moveFault"
                      }
                    ]
                  },
                  {
                    "filterType": "deserialize",
                    "deserializers": {
                      "json": {
                        "contentType": "application/vnd.masstransit+json"
                      },
                      "bson": {
                        "contentType": "application/vnd.masstransit+bson"
                      },
                      "xml": {
                        "contentType": "application/vnd.masstransit+xml"
                      }
                    },
                    "consumePipe": {}
                  }
                ]
              }
            ]
          }
        }
      }
    }

标签: rabbitmqmasstransit

解决方案


每个活动都需要一个单独的端点来执行,如果活动支持补偿,则需要一个额外的端点来进行补偿。您不能在同一个端点上配置您的活动,并且端点不应与其他服务共享。

使用 AddActivity() 足以配置容器,您无需使用您的活动调用 AddScoped。

此外,您可以使用 ConfigureEndpoints 并传递注册上下文来为所有活动、sagas 和消费者配置总线端点。

您包含的探测(太好了,谢谢)表明您只有一个接收端点 WorkerService,它有一个处理IPingRequest消息的使用者。


推荐阅读