首页 > 解决方案 > 重新订阅 Kafka-topic 并仅获取新消息

问题描述

我正在构建一个应用程序,我需要在其中即时订阅和取消订阅 Kafka 主题。问题是我找不到重新订阅主题的方法,因此我只能收到订阅后出现的该主题的新消息。

设置时,"auto.offset.reset": "latest"我只会在第一次订阅时收到新消息,而在以后的订阅中不会收到。

每次我需要订阅一个新主题时,我是否应该创建一个新的消费者组?

更新:

我尝试像这样设置消费者,这是正确的方法,但问题是我已经用我的 groupId 提交了偏移量。通过更改 groupId 解决了问题。

c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers":  os.Getenv("KAFKA_BOOTSTRAP_SERVERS"),
    "group.id":           "foobar",
    "enable.auto.commit": false,
    "auto.offset.reset":  "latest",
})

标签: goapache-kafka

解决方案


这是使用消费者组偏移跟踪时需要考虑的关键部分

每天听某些主题几次,最多几分钟...对这些时期之间发生的消息不感兴趣

要始终让您的消费者“关注”主题,请enable.auto.commit=false与 一起设置auto.offset.reset=latest,并且根本不提交偏移量

否则,seekToEnd()是你想要的消费者方法


推荐阅读