masstransit - 使用 MassTransit 发布活动
问题描述
我正在尝试在一个微服务中发布消息并在另一个微服务中获取它,但无法使用MassTransit 5.5.3 和RabbitMQ来实现这一点。
据我所知,我们不必创建ReceiveEndpoint即可发布事件,所以我只是在两个服务中创建相同的消息接口并发布消息,但正如我在RabbitMQ中看到的那样,它要么无处可去(如果未映射到队列)或转到“_skipped”队列。
出版商:
namespace Publisher
{
class Program
{
static async Task Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host("host", "vhost", h =>
{
h.Username("xxx");
h.Password("yyy");
});
});
bus.Start();
await bus.Publish<Message>(new { Text = "Hello World" });
Console.ReadKey();
bus.Stop();
}
}
public interface Message
{
string Text { get; set; }
}
}
消费者:
namespace Consumer
{
class Program
{
static async Task Main(string[] args)
{
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
IRabbitMqHost host = cfg.Host("host", "vhost", h =>
{
h.Username("xxx");
h.Password("yyy");
});
cfg.ReceiveEndpoint(host, e =>
{
e.Consumer<MbConsumer>();
});
});
bus.Start();
bool finish = false;
while(!finish)
{
await Task.Delay(1000);
}
bus.Stop();
}
}
public interface Message
{
string Text { get; set; }
}
public class MbConsumer : IConsumer<Message>
{
public async Task Consume(ConsumeContext<Message> context)
{
await Console.Out.WriteLineAsync(context.Message.Text);
}
}
}
我正在敦促消费者在消息发布后得到消息,但它没有得到它。我认为这是因为完整的消息类型不同(“Publisher.Message”与“Consumer.Message”),所以消息契约不同。我应该如何修复此代码以在消费者中获取事件?看起来我缺少一些关于RabbitMQ或MassTransit的基本信息。
解决方案
你的猜测是正确的。MassTransit 使用完全限定的类名称作为消息合同名称。MassTransit 还使用基于类型的路由,因此 FQCN 用于创建交换和绑定。
因此,如果您将消息类移动到单独的命名空间,例如:
namespace Messages
{
public interface Message
{
string Text { get; set; }
}
}
然后,您可以在发布消息时引用此类型
await bus.Publish<Messages.Message>(new { Text = "Hello World" });
并定义您的消费者
public class MbConsumer : IConsumer<Messages.Message>
{
public async Task Consume(ConsumeContext<Message> context)
{
await Console.Out.WriteLineAsync(context.Message.Text);
}
}
它会起作用的。
您可能还想查看 RMQ 管理 UI 以了解有关 MassTransit 拓扑的信息。使用您的代码,您将看到两个交换,一个Publisher.Message
和另一个Consumer.Message
,您的消费者队列绑定到Consumer.Message
交换,但是您将消息发布到Publisher.Message
交换,它们就消失了。
我还建议为您的接收端点指定一个有意义的端点名称:
cfg.ReceiveEndpoint(host, "MyConsumer", e =>
{
e.Consumer<MbConsumer>();
});
推荐阅读
- date - 如何从猪的全格式日期连接月份和日期
- node.js - 无法将后端的 Nodejs 和前端的 ReactJs 部署到 Heroku 的全栈项目
- reporting-services - 计算 SSRS 中的百分位数
- javascript - 使用数组 .forEach 函数迭代用户并使用 OPTION 元素填充 SELECT UI 元素
- javascript - 如何修复 DOMException:无法在“XMLHttpRequest”上执行“open”:调用第三方服务器/api 时 URL Angular HttpClient 无效?
- php - 间歇性 PHP 警告:“imap_open(): 无法打开流”
- ios - iMessage 键盘是什么类?
- react-native - React Native 如何传递包含列表的对象的列表以使用 FlatList?
- graphql - 在 Gatsby 中处理垂直和水平图像的优雅方式
- computational-geometry - 在CGAL中计算平行线