首页 > 解决方案 > react subscribe() 函数结束后需要开始一行代码

问题描述

val totalNumInst = TotalNumObj()

        devSupportService.sendAllTalktalkMessages(naverId)
        devSupportService.sendAllAutoDepositTalktalkMessages(naverId, totalNum)
        logger.info("${totalNumInst.totalNum}")
Mono<>
.doOnSuccess { }
.subscribe()

前两行执行几个 Mono<>.subscribe() 函数。在每个 Mono<> 的 .doOnSuccess{} 中,totalNum变量都在增加。在最后一行,我添加了一个显示totalNum. 但totalNum变量始终显示初始值 0。

我需要留下一个日志,显示 Mono<>.subscribe() 执行了多少次。感谢您阅读我的问题。

标签: kotlinreactor

解决方案


有两种方法可以解决您的问题。阻塞和非阻塞。

阻塞

创建一个countDownLatch,将其传递给sendAllTalktalkMessages和sendAllAutoDepositTalktalkMessages,然后等待它被锁定

val totalNumInst = TotalNumObj()
val latch = CountDownLatch(2)

devSupportService.sendAllTalktalkMessages(naverId, totalNumInst, latch)
devSupportService.sendAllAutoDepositTalktalkMessages(naverId, totalNumInst, latch)

if (!latch.await(30, TimeUnit.SECONDS)) {
    throw TimeoutException("Waiting timed out")
}
logger.info("${totalNumInst.totalNum}")

并添加latch.countDown()到每个doOnSuccess(但我建议在 doFinally 倒计时,以防链发送错误信号)

Mono<>
    .doOnSuccess { latch.countDown() }
    .subscribe()

这是阻塞解决方案,它反对反应式非阻塞概念。

非阻塞

使 sendAllTalktalkMessages 和 sendAllAutoDepositTalktalkMessages 返回 Mono 并压缩它们(此外,在这种情况下,您不需要将 totalNumInst 传递给它们)

Mono.zip(
    devSupportService.sendAllTalktalkMessages(naverId)
        .map { 1 }
        .onErrorResume { Mono.just(0) }
        .defaultIfEmpty(0),
    devSupportService.sendAllAutoDepositTalktalkMessages(naverId)
        .map { 1 }
        .onErrorResume { 0 }
        .defaultIfEmpty(0)
) { counter1, counter2 -> counter1 + counter2 }
.subscribe { totalNum -> logger.info("$totalNum") }

在这种实现中,您将每个成功计为 1,将每个错误或空信号计为 0。


推荐阅读