首页 > 解决方案 > 在reactor-kafka中立即生效?

问题描述

尝试实现一个重试记录处理器,该处理器在发生瞬时故障时回溯到最后一条记录的偏移量,而不是在不确定的时间内进入内部重试循环,并且如果错误持续足够长的时间,则冒着消费者会话超时和重新平衡的风险。

        kafkaTemplate
            .receive()
...
            .concatMap(record -> {
                var postResponses = mapper.transform(record.value())
                                          .stream()
                                          .map(this::post)
                                          .toArray(Mono[]::new);
                return Mono.when(postResponses)
                           .doOnSuccess(__ -> record.receiverOffset().acknowledge())
                           .onErrorResume(ex ->
                               kafkaTemplate.seek(record.receiverOffset().topicPartition(), record.offset())
                                            .then(Mono.delay(someRetryinterval).then())
                           );
            }, 0) // no prefetch!
            .subscribe();

将最大轮询记录设置为 1,在此代码开始之前,一个主题中有两条记录,Mono。当一直失败时,我看到它反复寻找 0 和 1,而不是只读取第一条记录,直到它成功。

我错过了什么吗?如何在不切换到阻塞 API 的情况下完全序列化 poll 并寻求调用以处理记录,并根据需要进行尽可能多的重试?

期望具有 SeekToCurrentErrorHandler 的 @KafkaListener 的等效项也可以在 RK 中工作,但发现并非如此

标签: project-reactorreactor-kafka

解决方案


推荐阅读