首页 > 解决方案 > spring cloud stream kafka binder以编程方式将kafka主题(多个分区)偏移重置为任意数字

问题描述

我需要将偏移量重置为一个数字。

详细要求:我的应用程序正在消耗来自 kafka 主题的消息并将其转储到 DB 中,假设 DB 在处理消耗的消息时关闭(偏移量 = 10),直到 DB 关闭时,应用程序消耗消息直到偏移量 20。

现在 DB 在处理第 20 个偏移量消息时再次出现,现在我想再次将偏移量重置为 10,以便我可以将数据保存在数据库中。

我可以以编程方式实现这一点(弹簧启动)吗?我正在使用spring cloud stream binder kafka。

标签: javaspring-bootapache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


如果数据库关闭,为什么还要继续接收消息?

只需配置侦听器容器以继续重试当前记录,直到数据库再次可用。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer() {
    return (container, topic, group) -> {
        container.setErrorHandler(...);
    };
}

https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current


推荐阅读