java - spring cloud stream kafka binder以编程方式将kafka主题(多个分区)偏移重置为任意数字
问题描述
我需要将偏移量重置为一个数字。
详细要求:我的应用程序正在消耗来自 kafka 主题的消息并将其转储到 DB 中,假设 DB 在处理消耗的消息时关闭(偏移量 = 10),直到 DB 关闭时,应用程序消耗消息直到偏移量 20。
现在 DB 在处理第 20 个偏移量消息时再次出现,现在我想再次将偏移量重置为 10,以便我可以将数据保存在数据库中。
我可以以编程方式实现这一点(弹簧启动)吗?我正在使用spring cloud stream binder kafka。
解决方案
如果数据库关闭,为什么还要继续接收消息?
只需配置侦听器容器以继续重试当前记录,直到数据库再次可用。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer() {
return (container, topic, group) -> {
container.setErrorHandler(...);
};
}
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current
推荐阅读
- python - ImportError: cannot import name 'string_int_label_map_pb2' (尝试了网络上的所有解决方案)
- python - 具有无限求和的 Python 曲线拟合
- autohotkey - 在不重新显示 GUI 的情况下更改 GUI 中显示的数据
- php - 在 PHP 中创建跨范围变量
- reactjs - Reactjs:如何从 submitHandler 访问上下文?
- python-3.x - Lambda 变量未定义。Python
- redis - 带有redis的Apache梁-选择数据库并从哈希中读取?
- javascript - 如何从 rxjs 中的可观察对象数组中解析 html 元素
- kotlin - Groovy 无法识别 spock 中的 kotlin 构造函数
- javascript - 如何展开和折叠单字符数组?