首页 > 解决方案 > Kafka 消费者(使用 kafka-net)默认从头获取所有消息

问题描述

我正在为我的 .net 项目构建一个 Kafka 消费者。我正在使用 kafka-net(来自 James Roland 的 Apache Kafka mady 的本地 c# 客户端)。

我遇到的问题是此代码(基于文档)默认情况下从头开始获取所有消息:

private void StartKafkaConsumer(string ipKafka, string portKafka, string topicKafka)
    {
        string topic = topicKafka;
        Uri uri = new Uri($"http://{ipKafka}:{portKafka}");
        var options = new KafkaOptions(uri);


        using (var router = new BrokerRouter(options))
        {
            using (var consumer = new Consumer(new ConsumerOptions(topic, router)))
            {
                foreach (var message in consumer.Consume())
                {

                    Console.WriteLine(Encoding.UTF8.GetString(message.Value));
                }
            }
        }
    }
...
StartKafkaConsumer("localhost", "9092", "test");  //this fetches messages sent weeks ago, since the creation of the 'test' topic

基本上,此代码与此命令的作用相同:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

我想做的就是从客户端连接到 Apache 服务器时获取消息,而不是从一开始。我知道这是可能的,因为我尝试了最后一个没有“--from-beginning”部分的命令并且它有效。

任何建议将不胜感激。

标签: c#apache-kafka

解决方案


看看ConsumerOptions方法。应该有一个设置属性/配置值的选项:

auto.offset.reset

将上述属性/配置设置为latest. 那时,只要您连接未知/新的消费者组 id,消费者将默认从最新的偏移量开始。

但是,如果消费者组 id 已知(即,已经从该主题/分区至少消费过一次),它将尝试获取最后提交的偏移量 + 1。如果该偏移量不可用,因为它可能已经消失超过保留阈值,默认为最新。

这是更详细的文档:

https://kafka.apache.org/documentation/#newconsumerconfigs


推荐阅读