spring - 从头开始阅读 Kafka 主题
问题描述
我有一个 spring boot 应用程序,它不断读取来自特定主题的所有消息。我的问题是我有一个问题,当应用程序重新启动时,它需要再次读取主题中的所有消息。我认为这两个选项结合起来可以做到,但它不起作用:
- 重置偏移:真
- startOffset:最早
这是我的方法;
@StreamListener(myTopic)
public void handle(@Payload Input input) {
/** Do other stuff not related **/
}
这是我的 application.yaml
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: group-topic
concurrency: 1
resetOffsets: true
startOffset: earliest
解决方案
您需要将group
application.yaml 中的 更改为新的唯一组名称(请参见下面的示例,其中我将使用者组设置为new-consumer-group-id:
spring:
cloud.stream:
bindings:
myTopic:
destination: TOPIC
content-type: application/**avro
group: new-consumer-group-id
concurrency: 1
resetOffsets: true
startOffset: earliest
如果继续使用同一个 ConsumerGroup,配置resetOffset
并startingOffset
不会产生任何影响。
作为替代方案,您可以使用命令行工具重置每个消费者组的偏移量kafka-consumer-groups.sh
。一个模板如下所示:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--execute --reset-offsets \
--group groupA\
--topic topicC \
--partition 0 \
--to-offset <insert number>
推荐阅读
- python - 无法从“typing_extensions”导入名称“TypeGuard”
- jquery - 声明的函数 ($.fn) 不是函数
- reactjs - NextJS:如何验证动态文件夹中的 ID?
- ios - 如何在 iOS 中同时使用自动高度和滚动视图?
- cmake - CMake CPPLINT 彩色输出
- node.js - node.js 护照 - 登录后批准指定路线
- ios - AVPlayer UIButton - 根据流是打开还是关闭显示播放或暂停图像按钮
- java - Keycloak Spring Security Adapter 如何使用公共 Keycloak 客户端验证访问令牌?
- python - 在所有 PC 处理器上并行运行函数 - Python
- azure - 如何获取发布管道中提供的秘密变量的值