首页 > 解决方案 > 从队列到队列消费和立即发布 RabbitMQ 消息的正确方法?

问题描述

使用消息、处理消息然后发布消息的正确方法是什么?我遇到了很多未确认的消息,我相信有一些阻塞正在发生。试图了解此类事情的最佳实践。

我正在开发一组每天将处理大约 50k 个请求的服务。我决定使用 RabbitMQ 和三个用 Dotnet Core 3.1 编写的 Windows 服务。

我已经绘制了该过程的图表,但本质上它是这样工作的:

代码示例如下图

在此处输入图像描述

protected override void OnStart(string[] args)
    {

        logger.LogInformation("Starting Service ...");

        base.OnStart(args);
        string queue = "Queue_StageOne";

        this.connection = factory.CreateConnection();
        this.channel = connection.CreateModel();
        this.publishingChannel = connection.CreateModel();
        this.channel.BasicQos(0, 1, false);

        consumer = new AsyncEventingBasicConsumer(channel);
        consumer.Received += Consumer_Recieved;

        this.channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
    }

    private async Task Consumer_Recieved(object sender, BasicDeliverEventArgs @event)
    {
        var body = @event.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        var inboundTransferObject = PatientObject.ConvertFromJson(message);

        //logger.LogInformation("Processed message " + inboundTransferObject.WebhookMessageId);

        //ServicePointManager.SecurityProtocol = SecurityProtocolType.SystemDefault;
        //X509Certificate2 cert = new X509Certificate2(config["CertificationPath"].ToString(), config["PFXPassword"]);

        //JToken access_token = GetAccessToken(cert);

        //JObject payerData = GetPractitionerData(inboundTransferObject, cert, access_token);

        //inboundTransferObject = ProcessPractitioner(inboundTransferObject, payerData);

        var outboundTransferObject = Encoding.ASCII.GetBytes(inboundTransferObject.ConvertToJson());

        channel.BasicAck(deliveryTag: @event.DeliveryTag, multiple: false);
        publishingChannel.BasicPublish(exchange: "ExchangeA", routingKey: "Queue_StageTwo", basicProperties: null, body:outboundTransferObject);
        await Task.Delay(250);

    }

标签: c#asynchronous.net-corerabbitmqwindows-services

解决方案


目前尚不清楚您在这里要问的确切内容,但确实突出的一件事是,您的服务不应该确认入站消息,除非并且直到它们完成所有处理步骤,包括发布后续出站消息。在您的代码示例中,您似乎在发布出站消息之前确认了入站消息。

但是,这并不能解释您描述的“我遇到了很多未确认的消息”的症状。你什么时候遇到这些?多少是很多?您是否对频道设置了预取限制?出于测试目的,您可以尝试将预取计数设置为 1,以确保一次只有一条消息在传输中。

channel.BasicQos(1, global: true)

请参阅RabbitMQ 文档的这一部分:

“由于消息是异步发送(推送)到客户端的,因此在任何给定时刻,通道上通常会有多个消息“在飞行中”。此外,来自客户端的手动确认本质上也是异步的。所以有一个滑动窗口未确认的交付标签。开发人员通常更愿意限制此窗口的大小以避免消费者端的无界缓冲区问题。这是通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认传递的最大数量。一旦数量达到配置的计数,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未确认的消息被确认。


推荐阅读