首页 > 解决方案 > 通过 RxJava 缓存 API 响应

问题描述

所以我试图将我的 http 响应缓存到 ConcurrentHashMap 中。我已经设置了我的缓存类和 Api 客户端类来返回 Observables,如下所示:

public class UserCache {

    private static ConcurrentHashMap<Integer, User> cache = new ConcurrentHashMap<>();

    public Observable<User> get(Integer key) {
        return Observable.create(observableEmitter -> {
            if(cache.contains(key)) observableEmitter.onNext(cache.get(key));
            observableEmitter.onComplete();
        });
    }

    public void update(Integer key, User user) {
        cache.putIfAbsent(key, user);
    }

    public boolean contains(Integer key) {
        return cache.contains(key);
    }
}

ApiClient

public class ApiClient {

    private UserApi api;
    private static ApiClient apiClient;

    private ApiClient() {
        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
        OkHttpClient client = new OkHttpClient.Builder().addInterceptor(interceptor).build();

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://jsonplaceholder.typicode.com")
                .client(client)
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .build();

        api = retrofit.create(UserApi.class);
    }

    public Observable<User> get(int id) {
        return api.getUser(id);
    }

    public static ApiClient getInstance() {
        if(apiClient == null) apiClient = new ApiClient();
        return apiClient;
    }
}

在 App 类中

public class App {

    ApiClient apiSource = ApiClient.getInstance();

    UserCache userCache = new UserCache();

    public Observable<User> get(Integer key) {
        return Observable.concat(userCache.get(key), apiSource.get(key))
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.computation())
                .doOnNext(user -> {
                    userCache.update(user.id, user);
                })
                .subscribeOn(Schedulers.io());
    }

    public static void main(String[] args) throws InterruptedException {
        App app = new App();
        app.get(1).subscribe(System.out::println);
        Thread.sleep(3000);
        app.get(1).subscribe(System.out::println);
        Thread.sleep(1000);
    }
}

我的理解.concat是,如果第一个 observable(缓存)不发射任何东西,那么第二个 Observable(api 客户端)将开始发射。但我不知道为什么doOnNext(user -> userCache.update(user.id, user))不更新缓存,因此当我检索相同的密钥时,会再次执行另一个 api 调用。

标签: javarx-javarx-java2

解决方案


我不确定你为什么doOnNext不发射,但如果你使用 RX,有一种方法需要更少的代码并消除竞争条件。在您给出的示例中,如果我们在空缓存上调用该方法两次,它将进行两次网络调用,最后一个将覆盖第一个。这是我的首选方法,它可以防止这种情况发生并且需要更少的代码:

private final ConcurrentMap<Integer, Observable<User>> userCache = Maps.newConcurrentMap();

public Observable<User> getUser(int id) {
    return userCache.computeIfAbsent(id, theId -> {
        ConnectableObservable<User> cachedObservable = getUserFromApi(id)
                .replay();
        cachedObservable.connect();
        return cachedObservable;
    }).doOnError(err -> userCache.remove(id));
}

如您所见,我存储缓存的 observables。这样,如果在第一次调用仍在运行时进行第二次调用,它们会得到相同的结果,并且只缓存一次。之后的所有调用都直接从缓存的 observable 中获取。

但是,我们不想缓存错误(可能),所以我附加了 adoOnError以确保任何包含错误(如网络故障)的 observable 也不会被缓存。


推荐阅读