首页 > 解决方案 > 使用 Kafka 的消息滞后

问题描述

我使用 Kafka 作为两个 API 之间的消息代理。当我在进行一些负载测试后开始注意到消息中的一些延迟时,我尝试调整配置,然后切换回 Confluent 自己的 .NET 库。目前我将代码修剪为简单地接收和发送消息而没有其他任何内容,但是当我运行负载测试时我仍然遇到延迟,而且我目前对如何解决这个问题一无所知。谁能给我一些帮助?

这就是我设置我的消费者的方式。

public abstract class BenchmarkConsumer<T> : BackgroundService where T : IntegrationEvent
    {
        private readonly IConsumer<Null, string> _consumer;
        private readonly IMediator _mediator;
        private readonly ILogger _logger;
        private readonly string _topic;
        private readonly string _host;

        protected BenchmarkConsumer(string host, string group, string topic, ILogger logger, IMediator mediator)
        {
            _host = host;
            _topic = topic;
            _logger = logger;
            _mediator = mediator;

            var consumerConfig = new ConsumerConfig
            {
                GroupId = group,
                BootstrapServers = host,
                SessionTimeoutMs = 6000,
                ConsumeResultFields = "none",
                QueuedMinMessages = 1000000,
                SecurityProtocol = SecurityProtocol.Plaintext,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                Acks = Acks.Leader
            };

            _consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build();
        }

        private async Task CreateTopic()
        {
            using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _host }).Build();
            try
            {
                await adminClient.CreateTopicsAsync(new[]
                    {
                        new TopicSpecification {Name = _topic, ReplicationFactor = 3, NumPartitions = 6}
                    }
                );
            }
            catch (CreateTopicsException e)
            {
                _logger.LogError($"Topic creation failed: ({_topic}): {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
            }
        }

        public async Task StartConsumingAsync(CancellationToken cancellationToken)
        {
            await CreateTopic();

            _consumer.Subscribe(_topic);

            while (!cancellationToken.IsCancellationRequested)
            {
                var consumedResult = _consumer.Consume(cancellationToken);

                if (consumedResult == null) continue;

                var messageAsEvent = JsonSerializer.Deserialize<T>(consumedResult.Message.Value);

                _logger.LogInformation($"Message deserialized: {JsonSerializer.Serialize(messageAsEvent)}");

                await _mediator.Publish(messageAsEvent, CancellationToken.None);
            }
        }
    }

这就是我设置制作人的方式:

public override void Produce(InitialOrderCreatedIntegrationEvent order)
        {
            var message = JsonSerializer.Serialize(order);

            Producer.Produce(_kafkaConfiguration.TopicOrderProducer, new Message<Null, string> {Value = message});
            _logger.LogInformation($"Message sent, OrderId: {order.OrderId}");
        }

标签: c#asp.netapache-kafka

解决方案


推荐阅读