首页 > 解决方案 > 用于 Kafka 消费者的反应性 Spring Cloud 功能在没有流结束的情况下提交偏移量

问题描述

对于 Spring 云功能反应式 Kafka 消费者,消费者偏移量正在提交,而流仍然在队列中具有消息。这意味着在应用程序重新启动时,流队列中的消息会丢失,但与它们对应的偏移量会在 Kafka 中提交。

有没有办法解决这种行为?

这就是函数的定义方式(以及没有基于时间的自动提交的适当的 spring cloud 函数配置)。

@Bean
public Consumer<Flux<TData>> processor() {
    return flux -> {
        flux.delayElements(Duration.ofMillis(100))
                .flatMapSequential(this::doWork)
                .doOnError(throwable -> log.error("error: ", throwable))
                .onErrorResume(e -> Mono.empty())
                 .onErrorContinue((e, obj) -> log.error("Error while processing", e))
                .subscribe();
    }
}

标签: apache-kafkaspring-kafkaproject-reactorspring-cloud-function

解决方案


推荐阅读