apache-flink - FlinkKafkaConsumer 从主题开始读取
问题描述
我正在尝试将 kafka 主题作为 Flink 中的数据流读取。我正在FlinkKafkaConsumer
阅读该主题。
我面临的问题是,经过几次测试后,我想从主题开始重新开始阅读以进行一些额外的测试。理想情况下,更改group.id
和重新启动作业都应该足以完成此任务。
但是重启后,消费者仍然能够找到旧的 checkpoints/kafka.commit。我还尝试删除所有检查点删除所有 configMaps 和部署并重新启动所有内容,但同样的事情再次发生。我可以看到在任务管理器日志中设置的偏移量。
怎么从头再来读一遍题目?
2021-02-17 10:08:41,287 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Discovered group coordinator idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 2147483647 rack: null)
2021-02-17 10:08:41,324 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-0 to the committed offset FetchPosition{offset=40204, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,326 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-1 to the committed offset FetchPosition{offset=39962, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-4 to the committed offset FetchPosition{offset=40444, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-2 to the committed offset FetchPosition{offset=40423, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
2021-02-17 10:08:41,328 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-FlinkChangeConsumerNewAgain-2, groupId=FlinkChangeConsumerNewAgain] Setting offset for partition adhoc-testing-3 to the committed offset FetchPosition{offset=40368, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=idsp-cdp-qa-ehns-2.servicebus.windows.net:9093 (id: 0 rack: null), epoch=-1}}
解决方案
我认为问题不在于消费者能够找到旧的提交或旧的检查点,只要您从头开始而不是从保存点开始工作。
问题似乎是您没有auto.offset.reset
在 Kafka Consumer 上设置,这意味着使用了默认值,即latest
. 因此,每当您开始使用 new 工作时group.id
,它将始终从提交给 Kafka 的最新偏移量开始。您可以通过简单地将auto.offset.reset
属性集earliest
传递给传递给 KafkaConsumer 的属性来更改它。
推荐阅读
- javascript - 根据条件禁用 Dynamics CRM 可编辑子网格中的列
- ios - 仅在首次初始化自定义 UIView 时如何在 LayoutSubviews 中执行代码
- android - 是否可以用新数据更新 MPAndroidChart?
- excel - 有没有一种方法可以通过字符串变量将工作表的名称传递给 Sheet.select 函数?
- ios - 当我尝试使用 tableView(moveRowAt:) 重新排序行时,TableView 行高缩小
- google-sheets - 我在工作表中的条件格式范围内做错了什么?
- python - Groupby 在具有重叠组的列上
- meteor - 在 /public 中为 Meteor 应用程序创建一个临时文件
- c# - 是否可以使用原始 SQL 编写 LINQ?
- unit-testing - 抽象类的模拟行为