c# - Wait for Mass Transit saga to finish
问题描述
I'm trying to create a saga that returns some result to the caller, just like the Request/Response pattern. I'm able to start the saga if I call the Send method, but not by submitting a Request.
So, the saga logic runs fine, but it doesn't return anything to the client.
Or submitting a Request gets processed by it's consumer and returns a response to the client, but never starts the saga.
UPDATE: The answer to masstransit deferred respond in sagas doesn't seem to apply to my question for two reasons:
1) I wasn't able to start the saga by calling the Request method;
2) If I call the Send method to send the request and, later on, send the response, the caller thread does not wait for the response to get back before continuing to the next line of code;
[END OF UPDATE]
Please find the complete code here. And below are the more relevant fragments:
Here is the saga class:
public class MySaga : MassTransitStateMachine<MySagaState>
{
public static Uri address = new Uri($"loopback://localhost/req_resp_saga");
public Event<IStartSagaCommand> StartSaga { get; private set; }
public Request<MySagaState, MyRequest, MyResponse> SomeRequest { get; private set; }
public MySaga()
{
InstanceState(s => s.CurrentState);
Event(() => StartSaga,
cc =>
cc.CorrelateBy(state => state.Data, context => context.Message.Data)
.SelectId(context => Guid.NewGuid()));
Request(() => SomeRequest, x => x.NullableCorrelationId, cfg =>
{
cfg.ServiceAddress = address;
cfg.SchedulingServiceAddress = address;
cfg.Timeout = TimeSpan.FromSeconds(30);
});
Initially(
When(StartSaga)
.Then(context =>
{
context.Instance.Data = context.Data.Data;
})
.ThenAsync(
context => Console.Out.WriteLineAsync($"Saga started: " +
$" {context.Data.Data} received"))
.Request(SomeRequest, context => new MyRequest() { CorrelationId = context.Instance.CorrelationId, RequestMessage = "Please do this" })
.TransitionTo(SomeRequest.Pending)
.ThenAsync(context => Console.Out.WriteLineAsync($"Transition completed: " +
$" {(context.Instance.CurrentState == SomeRequest.Pending ? "pending" : "done")} received"))
//.Then(context =>
//{
// var endpoint = context.GetSendEndpoint(address).GetAwaiter().GetResult();
// endpoint.Send(new MyResponse() { CorrelationId = context.Instance.CorrelationId, ResponseMessage = "Your wish is my command" });
//})
);
During(SomeRequest.Pending,
When(SomeRequest.Completed)
.ThenAsync(
context => Console.Out.WriteLineAsync($"Saga ended: " +
$" {context.Data.ResponseMessage} received"))
.Finalize()
);
}
}
This starts the saga but doesn't wait for it to finish and respond:
var address = new Uri($"loopback://localhost/req_resp_saga");
var endPoint = bus.GetSendEndpoint(address)
.Result;
endPoint.Send<IStartSagaCommand>(new { Data = "Hello World!!" });
And this waits for a response but doesn't involve the saga at all:
var address = new Uri($"loopback://localhost/req_resp_saga");
var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
.GetAwaiter()
.GetResult();
How can I have the caller start the saga and wait for it to finish and do something with its reponse?
解决方案
I was able to find the solution myself.
The problem was that I was confused about how to trigger the saga with a Request call. I thought I had to declare a
Request<in TInstance, TRequest, TResponse>
(Automatonimous)
That was not working for me.
And the Event I used to start the saga had it's own interface
Event<IStartSaga>
which was not the same I was using when calling the Request method
var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
.GetAwaiter()
.GetResult();
So the fix was to change the declaration of the event to
Event<MyRequest>
Now the saga starts whenever I call Request with a MyResquest message. And the caller waits for the response from the saga.
I made some other changes to clean up the code a little and pushed it to github too.
推荐阅读
- asynchronous - Kotlin:具有 Suspend Function 返回类型的高阶函数
- ios - 缩小后折线开始消失
- flutter - 处理异常 HTTP 请求颤动
- amazon-web-services - AWS S3 PutBucketLifecycleConfiguration 上的 MalformedXML 错误
- powershell - 如何在执行下一步之前检查所有字符串
- json - 哪些APP或软件可以将IFC还原为JSON
- javascript - 在我的 theme.js.liquid 文件中使用产品的 variant.inventory_policy
- javascript - 如何在正则表达式匹配之前提取一行?
- python - 如何并行执行python subTests?
- javascript - 仅返回对象数组中的一些键