首页 > 解决方案 > 使用 MassTransit.AmazonSQS 安排活动

问题描述

我在我的软件中使用masstransit 进行分布式消息传递,我试图为这个特定用例实现的是创建一个预定消息,该消息将在特定时间后触发

await  _publishEndpoint.SchedulePublish<ScheduleNotification>( 
        DateTime.UtcNow + TimeSpan.FromSeconds(180),
        new
    {
         DeliveryTime = DateTime.Now.AddMinutes(1.5),
         scheduleNotification.Body,
         scheduleNotification.EmailAddress
    });

https://github.com/makafanpeter/Demo.ScheduleMessages/blob/6f92955d8697acaefdac74426210d7411cd69a82/ScheduleMessages/Controllers/NotificationController.cs#L30

但我在这里注意到的是,这条消息是在它发布后立即触发的,而不是等到预定的时间。

public async Task Consume(ConsumeContext<ScheduleNotification> context)
{
    await context.ScheduleSend<SendNotification>(
        context.Message.DeliveryTime,
        new 
        {
            EmailAddress = context.Message.EmailAddress,
            Body =  context.Message.Body
        });
}

使用 In-Memory 传输设置 MassTransit 用于调度消息;在延迟过去之前,不会执行已发布的事件。

 services.AddMassTransit(x =>
            {
                x.AddDelayedMessageScheduler();
                x.AddConsumer(typeof(ScheduleNotificationConsumer));
                x.AddConsumer(typeof(SendNotificationConsumer));
                x.UsingInMemory((context, cfg) =>
                {



                    cfg.UseDelayedMessageScheduler();

                    cfg.ConfigureEndpoints(context);
                });
            });

尝试对 AmazonSQS 传输执行相同操作 未按预期工作,因为计划的消息在事件发布后立即执行。

  services.AddMassTransit(x =>
            {

                x.AddDelayedMessageScheduler();
                // add all consumers in the specified assembly
                x.AddConsumer(typeof(ScheduleNotificationConsumer));
                x.AddConsumer(typeof(SendNotificationConsumer));

                x.UsingAmazonSqs((context, cfg) =>
                {

                    cfg.Host(region, h =>
                    {
                        h.AccessKey(ACCESS_KEY_ID);
                        h.SecretKey(SECRET_ACCESS_KEY);
                        // scope topics as well
                        h.EnableScopedTopics();

                    });
                    cfg.UseDelayedMessageScheduler();
                    cfg.ConfigureEndpoints(context);

                });

            });

我可能做错了什么?我有源代码在https://github.com/makafanpeter/Demo.ScheduleMessages

标签: c#asp.net-core.net-coreamazon-sqsmasstransit

解决方案


推荐阅读