reactive-programming - Webflux WebClient 异步请求和处理 Mono
问题描述
我是 webflux 的新手,无法找到合适的材料来继续实施。
我想发出请求并异步处理响应。在这种情况下,服务调用大约需要 8-10 毫秒来响应,因此我们发出请求并继续做其他工作,并在需要进一步处理时寻找响应。
Mono<Map<String,Price>> resp = webClient.post()
.uri("/{type}",isCustomerPricing ? "customer" : "profile")
.body(Mono.just(priceDetailsRequest),PriceDetailsRequest.class)
.retrieve().bodyToMono(customerPriceDetailsType);
我们如何使这个调用在不同的线程上异步执行。(我用 Schedulers.single/Scheuldes.parallel 尝试了subscriberOn),但是直到 Mono.block() 被调用后才看到调用被执行。
我们如何实现?
- 我们希望此调用在单独的线程上并行执行,以便当前线程可以继续其他工作
- 处理完成后,设置对上下文的响应
- 当前线程查找响应时,如果服务还没有完成,阻塞直到调用完成
解决方案
您无需阻止使用响应。只需分配一个操作员来使用同一链中的响应。下面给出一个例子。
Mono<Map<String,Price>> resp = webClient.post()
.uri("/{type}",isCustomerPricing ? "customer" : "profile")
.body(Mono.just(priceDetailsRequest),PriceDetailsRequest.class)
.retrieve()
.bodyToMono(CustomerPriceDetailsType.class)
.map(processor::responseToDatabaseEntity) // Create a persistable entity from the response
.map(priceRepository::save) // Save the entity to the database
.subscribe(); //This is to ensure that the flux is triggered.
或者,您可以提供消费者作为subscribe()
方法的参数。