apache-kafka - 用于 Kafka 消费者的反应性 Spring Cloud 功能在没有流结束的情况下提交偏移量
问题描述
对于 Spring 云功能反应式 Kafka 消费者,消费者偏移量正在提交,而流仍然在队列中具有消息。这意味着在应用程序重新启动时,流队列中的消息会丢失,但与它们对应的偏移量会在 Kafka 中提交。
有没有办法解决这种行为?
这就是函数的定义方式(以及没有基于时间的自动提交的适当的 spring cloud 函数配置)。
@Bean
public Consumer<Flux<TData>> processor() {
return flux -> {
flux.delayElements(Duration.ofMillis(100))
.flatMapSequential(this::doWork)
.doOnError(throwable -> log.error("error: ", throwable))
.onErrorResume(e -> Mono.empty())
.onErrorContinue((e, obj) -> log.error("Error while processing", e))
.subscribe();
}
}
解决方案
推荐阅读
- .net - 使用 .NET 项目的声纳扫描仪命令传递证书
- binary - 如何添加二进制列:1:值 > 2,否则为 0
- php - 如何设置 Laravel 生成具有所需权限的文件
- javascript - AWS POST 到 API Gateway 返回 415“不支持的媒体类型”
- python - 如何使 spaCy 匹配不区分大小写
- dialog - 如何直接在页面上加载触摸 Ui 对话框而不在 aem 中弹出对话框
- javascript - 将类添加到创建的元素
- python - 推理时仅获取 CSV 文件中最后图像的坐标。Python
- java - 尝试启动新活动时应用程序停止:AndroidRuntime: FATAL EXCEPTION
- python - 如何使用 python 在 DICOM 文件上应用最大强度投影和最小强度投影?