首页 > 解决方案 > 如何使用反应器有条件地重复或重试

问题描述

我使用 SpringBoot 和响应式编程与 Webflux。我想重复服务,直到数据可用(除了 null 之外返回的东西)

我有一个将一些数据插入数据库的服务,并且有第二个服务消耗数据。我想继续从第二个服务查询数据库,直到数据可用。下面的代码我正在尝试使用 Project Reactor 来实现这一点:

Mono<SubscriptionQueryResult<App, App>> subscriptionQuery = reactiveQueryGateway
.subscriptionQuery(new FindAppByIdQuery(appId), ResponseTypes.instanceOf(App.class), ResponseTypes.instanceOf(App.class));

subscriptionQuery
  .filter(a -> Objects.nonNull(a.initialResult().block()))
  .repeatWhen(Repeat.onlyIf(repeatContext -> true)
  .exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
  .timeout(Duration.ofSeconds(30))).subscribe();

在执行此操作时,我遇到了以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1

在浏览 webflux 文档时,我发现在 Reactor 线程中无法调用 block() 函数。这样的尝试会导致上述错误:

为了克服我在下面尝试的问题:

subscriptionQuery
 .flatMap(a -> a.initialResult())
 .filter(a -> Objects.nonNull(a))
 .repeatWhen(Repeat.onlyIf(repeatContext -> true)
 .exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
 .timeout(Duration.ofSeconds(30)))
 .subscribe();

但这并没有给我想要的结果,我想我错过了一些东西。任何人都可以请提出正确的方法来实现这一目标。

谢谢。

标签: spring-webfluxrepeatproject-reactorreactoraxon

解决方案


让我试着在这个问题上帮助你。

事实上,最好的解决方法是在发送命令之前订阅它。这样,您就知道订阅查询何时发出更新。

我们有一个代码示例可以帮助您。

为了扩展这一点,您最感兴趣的部分应该是CommandController上的这个:

public <U> Mono<U> sendAndReturnUpdate(Object command, SubscriptionQueryResult<?, U> result) {
    /* The trick here is to subscribe to initial results first, even it does not return any result
     Subscribing to initialResult creates a buffer for updates, even that we didn't subscribe for updates yet
     they will wait for us in buffer, after this we can safely send command, and then subscribe to updates */
    return Mono.when(result.initialResult())
               .then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
               .thenMany(result.updates())
               .timeout(Duration.ofSeconds(5))
               .next()
               .doFinally(unused -> result.cancel());
    /* dont forget to close subscription query on the end and add a timeout */
}

推荐阅读