首页 > 解决方案 > FlinkKafkaConsumer 从主题开始读取

问题描述

我正在尝试将 kafka 主题作为 Flink 中的数据流读取。我正在FlinkKafkaConsumer阅读该主题。

我面临的问题是,经过几次测试后,我想从主题开始重新开始阅读以进行一些额外的测试。理想情况下,更改group.id和重新启动作业都应该足以完成此任务。

但是重启后,消费者仍然能够找到旧的 checkpoints/kafka.commit。我还尝试删除所有检查点删除所有 configMaps 和部署并重新启动所有内容,但同样的事情再次发生。我可以看到在任务管理器日志中设置的偏移量。

怎么从头再来读一遍题目?

2021-02-17 10:08:41,287 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Discovered group coordinator idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 2147483647 rack: null)
2021-02-17 10:08:41,324 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-0 to the committed offset FetchPosition{offset=40204, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,326 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-1 to the committed offset FetchPosition{offset=39962, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-4 to the committed offset FetchPosition{offset=40444, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-2 to the committed offset FetchPosition{offset=40423, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-3 to the committed offset FetchPosition{offset=40368, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}

标签: apache-flinkflink-streaming

解决方案


我认为问题不在于消费者能够找到旧的提交或旧的检查点,只要您从头开始而不是从保存点开始工作。

问题似乎是您没有auto.offset.reset在 Kafka Consumer 上设置,这意味着使用了默认值,即latest. 因此,每当您开始使用 new 工作时group.id,它将始终从提交给 Kafka 的最新偏移量开始。您可以通过简单地将auto.offset.reset属性集earliest传递给传递给 KafkaConsumer 的属性来更改它。


推荐阅读