首页 > 解决方案 > 在 Spring WebFlux Web 应用程序中缓存来自 WebClient 调用的 Mono 结果

问题描述

我正在寻找缓存一个Mono(仅当它成功时),它是 WebClient 调用的结果。

通过阅读项目反应堆插件文档,我认为CacheMono并不适合,因为它也缓存了我不想要的错误。

因此,我不使用CacheMono我正在执行以下操作:

Cache<MyRequestObject, Mono<MyResponseObject>> myCaffeineCache = 
    Caffeine.newBuilder()
            .maximumSize(100)
            .expireAfterWrite(Duration.ofSeconds(60))
            .build();

MyRequestObject myRequestObject = ...;

Mono<MyResponseObject> myResponseObject = myCaffeineCache.get(myRequestObject,
    requestAsKey -> WebClient.create()
                             .post()
                             .uri("http://www.example.com")
                             .syncBody(requestAsKey)
                             .retrieve()
                             .bodyToMono(MyResponseObject.class)
                             .cache()
                             .doOnError(t -> myCaffeineCache.invalidate(requestAsKey)));

在这里,我调用缓存Mono,然后将其添加到咖啡因缓存中。

任何错误都会进入doOnError使缓存失效。

这是缓存MonoWebClient 响应的有效方法吗?

标签: spring-webfluxproject-reactor

解决方案


这是您实际上被允许调用非反应性库并用反应性类型包装它们并在副作用运算符中完成处理的极少数用例之一,例如doOnXYZ,因为:

  • 咖啡因是一种内存缓存,据我所知不涉及 I/O
  • 缓存通常不能为缓存值提供强有力的保证(它非常“一劳永逸”)

然后,在这种情况下,您可以查询缓存以查看是否存在缓存版本(包装并立即返回),并在doOn运算符中缓存成功的真实响应,如下所示:

public class MyService {

    private WebClient client;

    private Cache<MyRequestObject, MyResponseObject> myCaffeineCache;

    public MyService() {
        this.client = WebClient.create();
        this.myCaffeineCache = Caffeine.newBuilder().maximumSize(100)
          .expireAfterWrite(Duration.ofSeconds(60)).build();
    }

    public Mono<MyResponseObject> fetchResponse(MyRequestObject request) {

        MyResponseObject cachedVersion = this.myCaffeineCache.get(myRequestObject);
        if (cachedVersion != null) {
           return Mono.just(cachedVersion);
        } else {
           return this.client.post()
                         .uri("http://www.example.com")
                         .syncBody(request.getKey())
                         .retrieve()
                         .bodyToMono(MyResponseObject.class)
                         .doOnNext(response -> this.myCaffeineCache.put(request.getKey(), response));
    }
}

请注意,我不会在这里缓存反应类型,因为一旦缓存返回值,就不会涉及 I/O,也不会产生背压。相反,订阅和其他反应流约束使事情变得更加困难。

此外,您对cache运营商的看法是正确的,因为它不是关于缓存值本身,而是更多关于重播发生在其他订阅者身上的事情。我相信cacheandreplay运算符实际上是Flux.


推荐阅读