c# - 如何解决 masstransit.rabbitmq(5.3.2)、rabbitmq.client(5.1.0) 的大量发布消息的 541 问题
问题描述
我将 Masstransit.RabbitMQ 与我的 dotnet 核心应用程序一起使用。我正在构建一个包含 Masstransit 的框架,如果需要,将来可以即插即用其他服务总线。
我在以下 LOC 中遇到错误:
TaskUtil.Await(BusControl.Publish<Message<TMessage>>(message, c => { c.Durable = true; }));
它通常工作正常。但是当我尝试发布 50000 多条消息时,在正确传递 80% 的消息后,我收到以下错误:
已经关闭:AMQP 操作被中断:AMQP 关闭原因,>由库发起,code=541,text="Unexpected Exception",classId=0,>methodId=0,cause=System.IO.IOException:无法读取来自 >transport 连接的数据:现有连接被 >remote 主机强行关闭。---> System.Net.Sockets.SocketException: An existing connection > wasforced closeed by remote host at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, >Int32 size) --- End内部异常堆栈跟踪 --- 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop() 的 RabbitMQ.Client.Impl.InboundFrame.ReadFrom(NetworkBinaryReader reader)
我正在使用单例来创建连接。不知道为什么连接在一个点后失败。我的目标是构建一个能够处理任意数量消息的可靠框架。
我尝试添加 5 秒的心跳并将 PublishConfirms 设置为 true。它只是最小化了 TAT 以引发错误。仅供参考,Prefetch 设置为 16。我检查了许多面临类似问题的用户,但找不到解决方案。
return MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(settings.RabbitMQURL), hst =>
{
hst.Username(settings.RabbitMQUserName);
hst.Password(settings.RabbitMQPassword);
hst.Heartbeat(5);
hst.PublisherConfirmation = true;
});
foreach (Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> action in settings.BeforeBuildActions)
{
action.Invoke(cfg, host);
}
});
我希望成功发布和使用 100000 条消息。但是我在中途遇到错误。
解决方案
推荐阅读
- tfs - 将 TFS 中的评论解析限制为仅作者
- c# - 没有装箱值实例的接口的泛型和使用
- c# - 如何有效地编辑数据库中的数据?
- python - Python mock 返回一个 MagicMock 对象而不是指定的结果值
- java - 对 url 的 GET 请求(接受条款和条件)返回与响应正文相同的页面,但在 POSTMAN 中有效
- git - 我可以在 GitHub Enterprise 中跟踪 repo 克隆活动吗?
- python - 根据python中的条件制作列表以创建唯一列表
- javascript - 如何使用 jquery/javascript 设置弹出宽度最大宽度
- c++ - 从 lineEdit QT 获取 unicode 并将其写入文件
- spotfire - 对于满足特定条件的值,同一列中的行之间的差异