java - 通过 RxJava 缓存 API 响应
问题描述
所以我试图将我的 http 响应缓存到 ConcurrentHashMap 中。我已经设置了我的缓存类和 Api 客户端类来返回 Observables,如下所示:
public class UserCache {
private static ConcurrentHashMap<Integer, User> cache = new ConcurrentHashMap<>();
public Observable<User> get(Integer key) {
return Observable.create(observableEmitter -> {
if(cache.contains(key)) observableEmitter.onNext(cache.get(key));
observableEmitter.onComplete();
});
}
public void update(Integer key, User user) {
cache.putIfAbsent(key, user);
}
public boolean contains(Integer key) {
return cache.contains(key);
}
}
ApiClient
public class ApiClient {
private UserApi api;
private static ApiClient apiClient;
private ApiClient() {
HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);
OkHttpClient client = new OkHttpClient.Builder().addInterceptor(interceptor).build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://jsonplaceholder.typicode.com")
.client(client)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create())
.build();
api = retrofit.create(UserApi.class);
}
public Observable<User> get(int id) {
return api.getUser(id);
}
public static ApiClient getInstance() {
if(apiClient == null) apiClient = new ApiClient();
return apiClient;
}
}
在 App 类中
public class App {
ApiClient apiSource = ApiClient.getInstance();
UserCache userCache = new UserCache();
public Observable<User> get(Integer key) {
return Observable.concat(userCache.get(key), apiSource.get(key))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.doOnNext(user -> {
userCache.update(user.id, user);
})
.subscribeOn(Schedulers.io());
}
public static void main(String[] args) throws InterruptedException {
App app = new App();
app.get(1).subscribe(System.out::println);
Thread.sleep(3000);
app.get(1).subscribe(System.out::println);
Thread.sleep(1000);
}
}
我的理解.concat
是,如果第一个 observable(缓存)不发射任何东西,那么第二个 Observable(api 客户端)将开始发射。但我不知道为什么doOnNext(user -> userCache.update(user.id, user))
不更新缓存,因此当我检索相同的密钥时,会再次执行另一个 api 调用。
解决方案
我不确定你为什么doOnNext
不发射,但如果你使用 RX,有一种方法需要更少的代码并消除竞争条件。在您给出的示例中,如果我们在空缓存上调用该方法两次,它将进行两次网络调用,最后一个将覆盖第一个。这是我的首选方法,它可以防止这种情况发生并且需要更少的代码:
private final ConcurrentMap<Integer, Observable<User>> userCache = Maps.newConcurrentMap();
public Observable<User> getUser(int id) {
return userCache.computeIfAbsent(id, theId -> {
ConnectableObservable<User> cachedObservable = getUserFromApi(id)
.replay();
cachedObservable.connect();
return cachedObservable;
}).doOnError(err -> userCache.remove(id));
}
如您所见,我存储缓存的 observables。这样,如果在第一次调用仍在运行时进行第二次调用,它们会得到相同的结果,并且只缓存一次。之后的所有调用都直接从缓存的 observable 中获取。
但是,我们不想缓存错误(可能),所以我附加了 adoOnError
以确保任何包含错误(如网络故障)的 observable 也不会被缓存。
推荐阅读
- android - 切换片段时 ItemTouchHelper 手势后崩溃
- angularjs - angularjs中的checkAll复选框不起作用
- laravel - 如何在 Laravel 单元测试中模拟多对多关系
- python - 如何仅提取文件名?
- java - Java Optional 接口:Optional 对象链上的嵌套条件
- c# - 在字符串中定义的我的类名的实例列表
- react-native - 在 webview react_native 中处理单击按钮或取消划线文本
- c++ - QML 参考错误 -
在 c++/QML 集成期间未定义 - reactjs - React 懒惰 vs Intersection 观察者 API
- azure - Azure AD:如何为应用设置自定义权限?