首页 > 解决方案 > 没有任何参数的 Flux.repeat() 的用例是什么?

问题描述

我正在查看以下代码,并对 repeat() 运算符在这里所做的事情感到困惑。

return inboundFlux
                .groupBy(record -> record.receiverOffset().topicPartition())
                .flatMap(partitionFlux -> partitionFlux
                        .concatMap(el -> Flux.just(el)
                                .doOnNext(receiverRecord -> {
                                    log.info("Starting to process {}", receiverRecord);
                                    messageProcessor.processMessage(receiverRecord);
                                    receiverRecord.receiverOffset().acknowledge();
                                    log.info("Message acknowledged");
                                })
                                .doOnError(e -> log.error("ERRRRRRROOORRRRRR"))
                                .retryWhen(Retry.backoff(3, Duration.ofSeconds(5)).maxBackoff(Duration.ofSeconds(20)).transientErrors(true))
                                .onErrorResume(e -> {
                                    // code to handle retry exhaustion
                                })
                        ).repeat()
                )
                .subscribeOn(scheduler)
                .subscribe(); 

标签: project-reactor

解决方案


推荐阅读