首页 > 解决方案 > 使用 MassTransit 发布活动

问题描述

我正在尝试在一个微服务中发布消息并在另一个微服务中获取它,但无法使用MassTransit 5.5.3 和RabbitMQ来实现这一点

据我所知,我们不必创建ReceiveEndpoint即可发布事件,所以我只是在两个服务中创建相同的消息接口并发布消息,但正如我在RabbitMQ中看到的那样,它要么无处可去(如果未映射到队列)或转到“_skipped”队列。

出版商:

namespace Publisher
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host("host", "vhost", h =>
                {
                    h.Username("xxx");
                    h.Password("yyy");
                });
            });

            bus.Start();

            await bus.Publish<Message>(new { Text = "Hello World" });

            Console.ReadKey();

            bus.Stop();
        }
    }

    public interface Message
    {
        string Text { get; set; }
    }
}

消费者:

namespace Consumer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                IRabbitMqHost host = cfg.Host("host", "vhost", h =>
                {
                    h.Username("xxx");
                    h.Password("yyy");
                });

                cfg.ReceiveEndpoint(host, e =>
                {
                    e.Consumer<MbConsumer>();
                });
            });

            bus.Start();

            bool finish = false;

            while(!finish)
            {
                await Task.Delay(1000);
            }

            bus.Stop();
        }
    }

    public interface Message
    {
        string Text { get; set; }
    }

    public class MbConsumer : IConsumer<Message>
    {
        public async Task Consume(ConsumeContext<Message> context)
        {
            await Console.Out.WriteLineAsync(context.Message.Text);
        }
    }
}

我正在敦促消费者在消息发布后得到消息,但它没有得到它。我认为这是因为完整的消息类型不同(“Publisher.Message”与“Consumer.Message”),所以消息契约不同。我应该如何修复此代码以在消费者中获取事件?看起来我缺少一些关于RabbitMQMassTransit的基本信息。

标签: masstransit

解决方案


你的猜测是正确的。MassTransit 使用完全限定的类名称作为消息合同名称。MassTransit 还使用基于类型的路由,因此 FQCN 用于创建交换和绑定。

因此,如果您将消息类移动到单独的命名空间,例如:

namespace Messages
{
    public interface Message
    {
        string Text { get; set; }
    }
}

然后,您可以在发布消息时引用此类型

await bus.Publish<Messages.Message>(new { Text = "Hello World" });

并定义您的消费者

public class MbConsumer : IConsumer<Messages.Message>
{
    public async Task Consume(ConsumeContext<Message> context)
    {
        await Console.Out.WriteLineAsync(context.Message.Text);
    }
}

它会起作用的。

您可能还想查看 RMQ 管理 UI 以了解有关 MassTransit 拓扑的信息。使用您的代码,您将看到两个交换,一个Publisher.Message和另一个Consumer.Message,您的消费者队列绑定到Consumer.Message交换,但是您将消息发布到Publisher.Message交换,它们就消失了。

我还建议为您的接收端点指定一个有意义的端点名称:

cfg.ReceiveEndpoint(host, "MyConsumer", e =>
{
    e.Consumer<MbConsumer>();
});

推荐阅读