c# - Kafka 消费者(使用 kafka-net)默认从头获取所有消息
问题描述
我正在为我的 .net 项目构建一个 Kafka 消费者。我正在使用 kafka-net(来自 James Roland 的 Apache Kafka mady 的本地 c# 客户端)。
我遇到的问题是此代码(基于文档)默认情况下从头开始获取所有消息:
private void StartKafkaConsumer(string ipKafka, string portKafka, string topicKafka)
{
string topic = topicKafka;
Uri uri = new Uri($"http://{ipKafka}:{portKafka}");
var options = new KafkaOptions(uri);
using (var router = new BrokerRouter(options))
{
using (var consumer = new Consumer(new ConsumerOptions(topic, router)))
{
foreach (var message in consumer.Consume())
{
Console.WriteLine(Encoding.UTF8.GetString(message.Value));
}
}
}
}
...
StartKafkaConsumer("localhost", "9092", "test"); //this fetches messages sent weeks ago, since the creation of the 'test' topic
基本上,此代码与此命令的作用相同:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
我想做的就是从客户端连接到 Apache 服务器时获取消息,而不是从一开始。我知道这是可能的,因为我尝试了最后一个没有“--from-beginning”部分的命令并且它有效。
任何建议将不胜感激。
解决方案
看看ConsumerOptions
方法。应该有一个设置属性/配置值的选项:
auto.offset.reset
将上述属性/配置设置为latest
. 那时,只要您连接未知/新的消费者组 id,消费者将默认从最新的偏移量开始。
但是,如果消费者组 id 已知(即,已经从该主题/分区至少消费过一次),它将尝试获取最后提交的偏移量 + 1。如果该偏移量不可用,因为它可能已经消失超过保留阈值,则默认为最新。
这是更详细的文档:
推荐阅读
- c++ - 与处于有点爆炸状态的串行端口通信
- entity-framework-core - 如何 EF.Property
方法适用于包含的查询 - css - 使用带有滚动事件的 margin-top 来修复一个在某些滚动区域中具有相对位置的 div 会导致滞后
- reactjs - 修改循环中生成的组件列表中的道具
- java - 将几个链表的内容添加到一个更大的链表中并不断更新它
- django - 停止自引用模型中的无限循环
- php - 如何验证格式的表单域 - 00.000.000.0-000.000
- javascript - 如何检查 nodeJS 中的前 3 个或 4 个元素是否为真?
- javascript - React Native 推送通知的问题
- r - 如何在R中获得没有网格的矩形主题?