首页 > 解决方案 > 如何解决 masstransit.rabbitmq(5.3.2)、rabbitmq.client(5.1.0) 的大量发布消息的 541 问题

问题描述

我将 Masstransit.RabbitMQ 与我的 dotnet 核心应用程序一起使用。我正在构建一个包含 Masstransit 的框架,如果需要,将来可以即插即用其他服务总线。

我在以下 LOC 中遇到错误:

    TaskUtil.Await(BusControl.Publish<Message<TMessage>>(message, c => { c.Durable = true; }));

它通常工作正常。但是当我尝试发布 50000 多条消息时,在正确传递 80% 的消息后,我收到以下错误:

已经关闭:AMQP 操作被中断:AMQP 关闭原因,>由库发起,code=541,text="Unexpected Exception",classId=0,>methodId=0,cause=System.IO.IOException:无法读取来自 >transport 连接的数据:现有连接被 >remote 主机强行关闭。---> System.Net.Sockets.SocketException: An existing connection > wasforced closeed by remote host at System.Net.Sockets.NetworkStream.Read(Byte[] buffer, Int32 offset, >Int32 size) --- End内部异常堆栈跟踪 --- 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration() 在 RabbitMQ.Client.Framing.Impl.Connection.MainLoop() 的 RabbitMQ.Client.Impl.InboundFrame.ReadFrom(NetworkBinaryReader reader)

我正在使用单例来创建连接。不知道为什么连接在一个点后失败。我的目标是构建一个能够处理任意数量消息的可靠框架。

我尝试添加 5 秒的心跳并将 PublishConfirms 设置为 true。它只是最小化了 TAT 以引发错误。仅供参考,Prefetch 设置为 16。我检查了许多面临类似问题的用户,但找不到解决方案。

        return MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri(settings.RabbitMQURL), hst =>
            {
                hst.Username(settings.RabbitMQUserName);
                hst.Password(settings.RabbitMQPassword);
                hst.Heartbeat(5);
                hst.PublisherConfirmation = true;
            });


            foreach (Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> action in settings.BeforeBuildActions)
            {
                action.Invoke(cfg, host);
            }
        });

我希望成功发布和使用 100000 条消息。但是我在中途遇到错误。

标签: c#rabbitmqmasstransit

解决方案


推荐阅读