首页 > 解决方案 > Confluent Kafka 消费者仅在更改 groupId 后才消费消息

问题描述

我有一个使用 Confluent.Kafka 的 .Net 核心控制台应用程序。我建立了一个消费者来消费来自特定主题的消息。该应用程序旨在每天运行几次,使用指定主题的消息并处理它们。

我花了一段时间来理解消费者的行为,但只有当它的 groupId 是一个以前从未使用过的消息时,它才会使用消息。每次我更改消费者的 groupId - 消费者都会获取订阅主题中的消息。但是在接下来的运行中——使用相同的 groupId——consumer.Consume 返回 null。

这种行为似乎与同一组消费者之间的重新平衡有关。但我不明白为什么——因为消费者应该只存在于整个应用程序运行时间。在离开应用程序之前,我调用了 consumer.close() 和 consumer.Dispose()。这些应该破坏消费者,以便在下一次运行时,当我创建消费者时,它将再次成为指定 groupId 上的第一个和单个消费者。但正如我所说,事实并非如此。

我知道有关于该主题的消息 - 我通过命令行检查它。而且我还确保主题只有 1 个分区。

最奇怪的是,我有另一个 .net 核心控制台应用程序,它执行相同的过程 - 并且完全没有问题。

我附上了 2 个应用程序的代码。

工作应用程序 - 总是消耗:

class Program
    {
        ...
        
        static void Main(string[] args)
        {
            if (args.Length != 2)
            {
                Console.WriteLine("Please provide topic name to read and SMTP topic name");
            }
            else
            {
                var services = new ServiceCollection();
                services.AddSingleton<ConsumerConfig, ConsumerConfig>();
                services.AddSingleton<ProducerConfig, ProducerConfig>();

                var serviceProvider = services.BuildServiceProvider();

                var cConfig = serviceProvider.GetService<ConsumerConfig>();
                var pConfig = serviceProvider.GetService<ProducerConfig>();

                cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
                cConfig.GroupId = "confluence-consumer";
                cConfig.EnableAutoCommit = true;
                cConfig.StatisticsIntervalMs = 5000;
                cConfig.SessionTimeoutMs = 6000;
                cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                cConfig.EnablePartitionEof = true;

                pConfig.BootstrapServers = Environment.GetEnvironmentVariable("producer_bootstrap_servers");

                var consumer = new ConsumerHelper(cConfig, args[0]);

                messages = new Dictionary<string, Dictionary<string, UserMsg>>();

                var result = consumer.ReadMessage();
                while (result != null && !result.IsPartitionEOF)
                {
                    Console.WriteLine($"Current consumed msg-json: {result.Message.Value}");

                    ...
                    
                    result = consumer.ReadMessage();
                }

                consumer.Close();
                Console.WriteLine($"Done consuming messages from topic {args[0]}");


            }

        }

类 ConsumerHelper.cs

namespace AggregateMailing
{
    using System;
    using Confluent.Kafka;
    public class ConsumerHelper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private IConsumer<string, string> _consumer;

        public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
        {
            try
            {
                _topicName = topicName;
                _consumerConfig = consumerConfig;

                var builder = new ConsumerBuilder<string, string>(_consumerConfig);
                _consumer = builder.Build();

                _consumer.Subscribe(_topicName);
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
            }
        }

        public ConsumeResult<string, string> ReadMessage()
        {
            Console.WriteLine("ReadMessage: start");
            try
            {
                return _consumer.Consume();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ReadMessage: {exc.ToString()}");
                return null;
            }
        }

        public void Close()
        {
            Console.WriteLine("Close: start");
            try
            {
                _consumer.Close();
                _consumer.Dispose();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Close: {exc.ToString()}");
            }
        }
    }
}

不工作的应用程序 - 仅在将使用者 groupId 更改为从未使用过的组后第一次运行时使用:

类 Program.cs

class Program
    {
        private static SmtpClient smtpClient;
        private static Random random = new Random();
        static void Main(string[] args)
        {
            try
            {
                var services = new ServiceCollection();
                services.AddSingleton<ConsumerConfig, ConsumerConfig>();
                services.AddSingleton<SmtpClient>(new SmtpClient("smtp.gmail.com"));

                var serviceProvider = services.BuildServiceProvider();

                var cConfig = serviceProvider.GetService<ConsumerConfig>();
                cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
                cConfig.GroupId = "smtp-consumer";
                cConfig.EnableAutoCommit = true;
                cConfig.StatisticsIntervalMs = 5000;
                cConfig.SessionTimeoutMs = 6000;
                cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                cConfig.EnablePartitionEof = true;


                var consumer = new ConsumerHelper(cConfig, args[0]);

                ...
                
                var result = consumer.ReadMessage();
                while (result != null && !result.IsPartitionEOF)
                {
                    Console.WriteLine($"current consumed message: {result.Message.Value}");
                    var msg = JsonConvert.DeserializeObject<EmailMsg>(result.Message.Value);

                    result = consumer.ReadMessage();
                }

                Console.WriteLine("Done sending emails consumed from SMTP topic");
                consumer.Close();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Main: {exc.ToString()}");
            }

        }

类 ConsumerHelper.cs

using Confluent.Kafka;
using System;
using System.Collections.Generic;

namespace Mailer
{
    public class ConsumerHelper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private IConsumer<string, string> _consumer;
        public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
        {
            try
            {
               _topicName = topicName;
               _consumerConfig = consumerConfig;

               var builder = new ConsumerBuilder<string, string> (_consumerConfig);
               _consumer = builder.Build();

               _consumer.Subscribe(_topicName);
               //_consumer.Assign(new TopicPartition(_topicName, 0));
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
            }
        }
        public ConsumeResult<string, string> ReadMessage()
        {
            Console.WriteLine("ConsumeResult: start");
            try
            {
                
                return _consumer.Consume();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumeResult: {exc.ToString()}");
                return null;
            }
        }
        public void Close()
        {
            Console.WriteLine("Close: start");
            try
            {
                _consumer.Close();
                _consumer.Dispose();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Close: {exc.ToString()}");
            }
            Console.WriteLine("Close: end");
        }

    }
}

标签: .net-coreapache-kafkaconfluent-kafka-dotnet

解决方案


推荐阅读