首页 > 解决方案 > Wait for Mass Transit saga to finish

问题描述

I'm trying to create a saga that returns some result to the caller, just like the Request/Response pattern. I'm able to start the saga if I call the Send method, but not by submitting a Request.

So, the saga logic runs fine, but it doesn't return anything to the client.

Or submitting a Request gets processed by it's consumer and returns a response to the client, but never starts the saga.

UPDATE: The answer to masstransit deferred respond in sagas doesn't seem to apply to my question for two reasons:

1) I wasn't able to start the saga by calling the Request method;

2) If I call the Send method to send the request and, later on, send the response, the caller thread does not wait for the response to get back before continuing to the next line of code;

[END OF UPDATE]

Please find the complete code here. And below are the more relevant fragments:

Here is the saga class:

public class MySaga : MassTransitStateMachine<MySagaState>
    {
        public static Uri address = new Uri($"loopback://localhost/req_resp_saga");

        public Event<IStartSagaCommand> StartSaga { get; private set; }

        public Request<MySagaState, MyRequest, MyResponse> SomeRequest { get; private set; }

        public MySaga()
        {
            InstanceState(s => s.CurrentState);

            Event(() => StartSaga,
                cc =>
                    cc.CorrelateBy(state => state.Data, context => context.Message.Data)
                        .SelectId(context => Guid.NewGuid()));

            Request(() => SomeRequest, x => x.NullableCorrelationId, cfg =>
            {
                cfg.ServiceAddress = address;
                cfg.SchedulingServiceAddress = address;
                cfg.Timeout = TimeSpan.FromSeconds(30);
            });

            Initially(
                When(StartSaga)
                    .Then(context =>
                    {
                        context.Instance.Data = context.Data.Data;
                    })
                    .ThenAsync(
                        context => Console.Out.WriteLineAsync($"Saga started: " +
                                                              $" {context.Data.Data} received"))
                    .Request(SomeRequest, context => new MyRequest() { CorrelationId = context.Instance.CorrelationId, RequestMessage = "Please do this" })
                    .TransitionTo(SomeRequest.Pending)
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Transition completed: " +
                                                                     $" {(context.Instance.CurrentState == SomeRequest.Pending ? "pending" : "done")} received"))
                    //.Then(context =>
                    //{
                    //    var endpoint = context.GetSendEndpoint(address).GetAwaiter().GetResult();
                    //    endpoint.Send(new MyResponse() { CorrelationId = context.Instance.CorrelationId, ResponseMessage = "Your wish is my command" });
                    //})
            );

            During(SomeRequest.Pending,
                When(SomeRequest.Completed)
                    .ThenAsync(
                        context => Console.Out.WriteLineAsync($"Saga ended: " +
                                                              $" {context.Data.ResponseMessage} received"))
                    .Finalize()
            );
        }
    }

This starts the saga but doesn't wait for it to finish and respond:

            var address = new Uri($"loopback://localhost/req_resp_saga");
            var endPoint = bus.GetSendEndpoint(address)
                .Result;

            endPoint.Send<IStartSagaCommand>(new { Data = "Hello World!!" });

And this waits for a response but doesn't involve the saga at all:

var address = new Uri($"loopback://localhost/req_resp_saga");
            var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
            var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
                .GetAwaiter()
                .GetResult();

How can I have the caller start the saga and wait for it to finish and do something with its reponse?

标签: c#masstransitrequest-response

解决方案


I was able to find the solution myself.

The problem was that I was confused about how to trigger the saga with a Request call. I thought I had to declare a

Request<in TInstance, TRequest, TResponse>

(Automatonimous)

That was not working for me.

And the Event I used to start the saga had it's own interface

Event<IStartSaga>

which was not the same I was using when calling the Request method

var requestClient = new MessageRequestClient<MyRequest, MyResponse>(bus, address, TimeSpan.FromSeconds(30));
            var response = requestClient.Request(new MyRequest() { CorrelationId = Guid.NewGuid(), RequestMessage = "Please do this" })
                .GetAwaiter()
                .GetResult();

So the fix was to change the declaration of the event to

Event<MyRequest>

Now the saga starts whenever I call Request with a MyResquest message. And the caller waits for the response from the saga.

I made some other changes to clean up the code a little and pushed it to github too.


推荐阅读