.net-core - Confluent Kafka 消费者仅在更改 groupId 后才消费消息
问题描述
我有一个使用 Confluent.Kafka 的 .Net 核心控制台应用程序。我建立了一个消费者来消费来自特定主题的消息。该应用程序旨在每天运行几次,使用指定主题的消息并处理它们。
我花了一段时间来理解消费者的行为,但只有当它的 groupId 是一个以前从未使用过的消息时,它才会使用消息。每次我更改消费者的 groupId - 消费者都会获取订阅主题中的消息。但是在接下来的运行中——使用相同的 groupId——consumer.Consume 返回 null。
这种行为似乎与同一组消费者之间的重新平衡有关。但我不明白为什么——因为消费者应该只存在于整个应用程序运行时间。在离开应用程序之前,我调用了 consumer.close() 和 consumer.Dispose()。这些应该破坏消费者,以便在下一次运行时,当我创建消费者时,它将再次成为指定 groupId 上的第一个和单个消费者。但正如我所说,事实并非如此。
我知道有关于该主题的消息 - 我通过命令行检查它。而且我还确保主题只有 1 个分区。
最奇怪的是,我有另一个 .net 核心控制台应用程序,它执行相同的过程 - 并且完全没有问题。
我附上了 2 个应用程序的代码。
工作应用程序 - 总是消耗:
class Program
{
...
static void Main(string[] args)
{
if (args.Length != 2)
{
Console.WriteLine("Please provide topic name to read and SMTP topic name");
}
else
{
var services = new ServiceCollection();
services.AddSingleton<ConsumerConfig, ConsumerConfig>();
services.AddSingleton<ProducerConfig, ProducerConfig>();
var serviceProvider = services.BuildServiceProvider();
var cConfig = serviceProvider.GetService<ConsumerConfig>();
var pConfig = serviceProvider.GetService<ProducerConfig>();
cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
cConfig.GroupId = "confluence-consumer";
cConfig.EnableAutoCommit = true;
cConfig.StatisticsIntervalMs = 5000;
cConfig.SessionTimeoutMs = 6000;
cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
cConfig.EnablePartitionEof = true;
pConfig.BootstrapServers = Environment.GetEnvironmentVariable("producer_bootstrap_servers");
var consumer = new ConsumerHelper(cConfig, args[0]);
messages = new Dictionary<string, Dictionary<string, UserMsg>>();
var result = consumer.ReadMessage();
while (result != null && !result.IsPartitionEOF)
{
Console.WriteLine($"Current consumed msg-json: {result.Message.Value}");
...
result = consumer.ReadMessage();
}
consumer.Close();
Console.WriteLine($"Done consuming messages from topic {args[0]}");
}
}
类 ConsumerHelper.cs
namespace AggregateMailing
{
using System;
using Confluent.Kafka;
public class ConsumerHelper
{
private string _topicName;
private ConsumerConfig _consumerConfig;
private IConsumer<string, string> _consumer;
public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
{
try
{
_topicName = topicName;
_consumerConfig = consumerConfig;
var builder = new ConsumerBuilder<string, string>(_consumerConfig);
_consumer = builder.Build();
_consumer.Subscribe(_topicName);
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
}
}
public ConsumeResult<string, string> ReadMessage()
{
Console.WriteLine("ReadMessage: start");
try
{
return _consumer.Consume();
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on ReadMessage: {exc.ToString()}");
return null;
}
}
public void Close()
{
Console.WriteLine("Close: start");
try
{
_consumer.Close();
_consumer.Dispose();
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on Close: {exc.ToString()}");
}
}
}
}
不工作的应用程序 - 仅在将使用者 groupId 更改为从未使用过的组后第一次运行时使用:
类 Program.cs
class Program
{
private static SmtpClient smtpClient;
private static Random random = new Random();
static void Main(string[] args)
{
try
{
var services = new ServiceCollection();
services.AddSingleton<ConsumerConfig, ConsumerConfig>();
services.AddSingleton<SmtpClient>(new SmtpClient("smtp.gmail.com"));
var serviceProvider = services.BuildServiceProvider();
var cConfig = serviceProvider.GetService<ConsumerConfig>();
cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
cConfig.GroupId = "smtp-consumer";
cConfig.EnableAutoCommit = true;
cConfig.StatisticsIntervalMs = 5000;
cConfig.SessionTimeoutMs = 6000;
cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
cConfig.EnablePartitionEof = true;
var consumer = new ConsumerHelper(cConfig, args[0]);
...
var result = consumer.ReadMessage();
while (result != null && !result.IsPartitionEOF)
{
Console.WriteLine($"current consumed message: {result.Message.Value}");
var msg = JsonConvert.DeserializeObject<EmailMsg>(result.Message.Value);
result = consumer.ReadMessage();
}
Console.WriteLine("Done sending emails consumed from SMTP topic");
consumer.Close();
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on Main: {exc.ToString()}");
}
}
类 ConsumerHelper.cs
using Confluent.Kafka;
using System;
using System.Collections.Generic;
namespace Mailer
{
public class ConsumerHelper
{
private string _topicName;
private ConsumerConfig _consumerConfig;
private IConsumer<string, string> _consumer;
public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
{
try
{
_topicName = topicName;
_consumerConfig = consumerConfig;
var builder = new ConsumerBuilder<string, string> (_consumerConfig);
_consumer = builder.Build();
_consumer.Subscribe(_topicName);
//_consumer.Assign(new TopicPartition(_topicName, 0));
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
}
}
public ConsumeResult<string, string> ReadMessage()
{
Console.WriteLine("ConsumeResult: start");
try
{
return _consumer.Consume();
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on ConsumeResult: {exc.ToString()}");
return null;
}
}
public void Close()
{
Console.WriteLine("Close: start");
try
{
_consumer.Close();
_consumer.Dispose();
}
catch (System.Exception exc)
{
Console.WriteLine($"Error on Close: {exc.ToString()}");
}
Console.WriteLine("Close: end");
}
}
}
解决方案
推荐阅读
- for-loop - 在 v-tabs 中使用 v-for 进行三级迭代
- excel - “此时无法进入中断模式”- 运行时错误“1004”:
- javascript - 需要从两个符号之间删除空格
- php - Spatie/pdf-to-image 获取图像数量总是返回 0
- java - SQL 语句执行错误说'_HP_KYCMS18837P51144'附近的sunataxt 不正确
- python - 选择特定列表列表的多个索引
- c# - 如何查找具有开头模式并包含模式的字符串
- java - 如何在 Spring 中使用自动装配的 bean 创建简单工厂模式?
- python - 消除重复值熊猫
- windows - 如何在 Visual Studio c++ 项目中指定“任何大于 10.0 的 Windows SDK 版本”?