android - RxJava2 Flowable:将对象发送到服务器并检测结束
问题描述
我需要将对象列表发送到我的远程服务器。由于它们可能很多而且很大,我使用一个 flowable 使用 request(1) 从一个数组列表中一个一个地发送它们。
对于每个对象,都会对服务器进行改造调用,作为回报,我会获得远程 ID,并使用远程 ID 更新本地对象。
我需要检测此任务的结束:即发送的最后一个对象的最后响应,以防止对同一对象的多个并发调用。
目前一切正常,但我在远程服务器收到答案之前收到“已完成”消息,因此在更新对象之前。
我怎样才能做到这一点 ?
Flowable<Integer> observable = Flowable.range(0, objList.size());
observable.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
Log.d(TAG, "on start");
request(1);
}
@Override
public void onNext(Integer t) {
Log.d(TAG, "on next : " + t);
MyObj = objList.get(t);
RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {
Log.d(TAG, "recu p");
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
request(1);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
request(1);
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(r -> {
request(1);
Observable.empty();
})
.onExceptionResumeNext(error -> Observable.empty()) // go to next on error
.subscribeOn(Schedulers.io()).onErrorReturn(error -> {
Log.d("ERROR", error.getMessage());
return 0;
})
.onErrorResumeNext(Observable.empty()).subscribe();
}
@Override
public void onError(Throwable t) {
Log.e("XXX ERROR ", "" + t);
request(1);
patientSynchroInProgress = Boolean.FALSE;
}
@Override
public void onComplete() {
Log.e("XXX COMPLETE", "complete");
}
});
解决方案
最后,我能够让它工作
Flowable.fromIterable(patientList)
.concatMap(item -> {
item.setSomething();
return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
.createOrUpdateObj(item)
.flatMap(p -> {
if (p != null) {
try {
p.setSyncho(true);
// save remote id on obj
ObjDB.updateObj(p);
return Observable.empty();
} catch (Throwable th) {
ExceptionHandler.logException(th);
return Observable.error(th);
}
} else {
return Observable.empty(); // provisoirement si pb on renvoie vide
}
})
.onErrorResumeNext(Observable.empty())
.toFlowable(BackpressureStrategy.BUFFER);
}
)
.doOnNext(s -> {
Log.d(TAG, ((Obj) s).toString());
})
.doOnComplete(() -> {
// do something when completed
Log.d(TAG, "COMPLETE");
})
.subscribe();
}
}
谢谢您的帮助
推荐阅读
- angular - Nativescript 图像缩略图
- python - 具有递归的生成器,列表列表的“死者”
- google-apps-script - 如何使用按钮在单元格中提交值,而焦点仍保留在该单元格中
- swift - 将结构的类型传递给协议绑定的函数
- javascript - EagleView/Pictometry:无法读取 i._walkAnnotations 处未定义的属性“isCollection”
- android - 使用 Rxjava 获取 Dialog 的输入值的最佳方法是什么?
- python - 初学者编码聊天机器人
- java - Spring Boot MVC 应用程序中的无效会话(注销)
- typescript - Typescript 类型的对象,其键是具有一个通用参数的函数?
- c# - 在 .net core WebAPI 项目中,哪些事件会导致 http 请求被取消?