c# - 卡夫卡消费者没有消费消息
问题描述
我是卡夫卡的新手。kafka 消费者没有从给定主题读取消息。我也在检查 kafka 控制台。它不工作。我不明白这个问题。它早些时候工作正常。
public string MessageConsumer(string brokerList, List<string> topics, CancellationToken cancellationToken)
{
//ConfigurationManager.AutoLoadAppSettings("", "", true);
string logKey = string.Format("ARIConsumer.StartPRoducer ==>Topics {0} Key{1} =>", "", string.Join(",", topics));
string message = string.Empty;
var conf = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "23",
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetResetType.Latest,
};
using (var c = new Consumer<Ignore, string>(conf))
{
try
{
c.Subscribe(topics);
bool consuming = true;
// The client will automatically recover from non-fatal errors. You typically
// don't need to take any action unless an error is marked as fatal.
c.OnError += (_, e) => consuming = !e.IsFatal;
while (consuming)
{
try
{
TimeSpan timeSpan = new TimeSpan(0, 0, 5);
var cr = c.Consume(timeSpan);
// Thread.Sleep(5000);
if (cr != null)
{
message = cr.Value;
Console.WriteLine("Thread" + Thread.CurrentThread.ManagedThreadId + "Message : " + message);
CLogger.WriteLog(ELogLevel.INFO, $"Consumed message Partition '{cr.Partition}' at: '{cr.TopicPartitionOffset} thread: { Thread.CurrentThread.ManagedThreadId}'. Message: {message}");
//Console.WriteLine($"Consumed message Partition '{cr.Partition}' at: '{cr.TopicPartitionOffset}'. Topic: { cr.Topic} value :{cr.Value} Timestamp :{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} GrpId: { conf.GroupId}");
c.Commit();
}
Console.WriteLine($"Calling the next Poll ");
}
catch (ConsumeException e)
{
CLogger.WriteLog(ELogLevel.ERROR, $"Error occured: {e.Error.Reason}");
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
//consuming = false;
}
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
catch (Exception ex)
{
}
}
return message;
}
此代码有什么问题,或者 kafka 存在安装问题
解决方案
是否有生产者主动发送数据?
您的消费者从基于 AutoOffsetReset 的最新偏移量开始,因此它不会读取主题中的现有数据
控制台消费者也默认使用最新的偏移量
如果您没有更改 GroupId,那么您的消费者可能已经工作过一次,然后您消费了数据,然后提交了该组的偏移量。当consumer在同一个组中再次启动时,只会从topic的末尾,或者最后一次commit的offset处恢复
你也有一个空的catch (Exception ex)
,这可能隐藏了一些其他错误
推荐阅读
- flutter - 如何禁用按钮(图标按钮)点击?
- c++ - 渲染边缘上的 CGAL::Triangulation_3 毛刺
- r - R中的sqldf比较两个data.frames并存储输出
- codesys - Codesys LD:如何从一个进程跳转到另一个进程
- python - 尝试查找并单击时出现 Selenium TimeoutException
- java - 在android应用程序中集成gmail登录的问题?
- python - 如何创建一个从元组列表中获取随机元组并在 python 中生成基本图的程序?
- linux - 在新版本中使用 docker rootless
- php - Codeigniter - 如何获取为其提供重复值的列名
- angular - 错误的重定向到本地主机