spring-webflux - 如何使用反应器有条件地重复或重试
问题描述
我使用 SpringBoot 和响应式编程与 Webflux。我想重复服务,直到数据可用(除了 null 之外返回的东西)
我有一个将一些数据插入数据库的服务,并且有第二个服务消耗数据。我想继续从第二个服务查询数据库,直到数据可用。下面的代码我正在尝试使用 Project Reactor 来实现这一点:
Mono<SubscriptionQueryResult<App, App>> subscriptionQuery = reactiveQueryGateway
.subscriptionQuery(new FindAppByIdQuery(appId), ResponseTypes.instanceOf(App.class), ResponseTypes.instanceOf(App.class));
subscriptionQuery
.filter(a -> Objects.nonNull(a.initialResult().block()))
.repeatWhen(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
.timeout(Duration.ofSeconds(30))).subscribe();
在执行此操作时,我遇到了以下异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
在浏览 webflux 文档时,我发现在 Reactor 线程中无法调用 block() 函数。这样的尝试会导致上述错误:
为了克服我在下面尝试的问题:
subscriptionQuery
.flatMap(a -> a.initialResult())
.filter(a -> Objects.nonNull(a))
.repeatWhen(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofMillis(100), Duration.ofSeconds(100))
.timeout(Duration.ofSeconds(30)))
.subscribe();
但这并没有给我想要的结果,我想我错过了一些东西。任何人都可以请提出正确的方法来实现这一目标。
谢谢。
解决方案
让我试着在这个问题上帮助你。
事实上,最好的解决方法是在发送命令之前订阅它。这样,您就知道订阅查询何时发出更新。
我们有一个代码示例可以帮助您。
为了扩展这一点,您最感兴趣的部分应该是CommandController上的这个:
public <U> Mono<U> sendAndReturnUpdate(Object command, SubscriptionQueryResult<?, U> result) {
/* The trick here is to subscribe to initial results first, even it does not return any result
Subscribing to initialResult creates a buffer for updates, even that we didn't subscribe for updates yet
they will wait for us in buffer, after this we can safely send command, and then subscribe to updates */
return Mono.when(result.initialResult())
.then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
.thenMany(result.updates())
.timeout(Duration.ofSeconds(5))
.next()
.doFinally(unused -> result.cancel());
/* dont forget to close subscription query on the end and add a timeout */
}
推荐阅读
- javascript - 在图表顶部绘制图像不适用于 ChartJS
- excel - Excel VB - Shell 对象和 Items 扩展属性
- mongodb - Mongodb [js] 未捕获的异常:SyntaxError:标识符在数字文字之后立即开始
- javascript - Discord.js - 如何将用户 ID 转换为会员
- java - Thymeleaf 既不是 BindingResult 也不是 bean 名称的普通目标对象
- python - 尝试将前/后插入我的堆栈
- javascript - 调用函数如何与切片函数一起使用?
- javascript - 逗号分隔的 6 位值的正则表达式
- node.js - 为什么在所有更新服务方法中都调用 Hook
- django - GCP:gunicorn.errors.HaltServer: